From 2a766e28c8a7443c09671d870c99beb8b712d0af Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Mon, 30 Sep 2024 09:11:25 -0700 Subject: [PATCH] [ETL-611] Raw sync lambda (#141) * initial commit raw sync lambda * add stacks for raw sync lambda * add publish to sns functionality to raw sync lambda * Add cloudwatch events rule for raw sync lambda * raw sync lambda minor improvements --- .github/workflows/upload-and-deploy.yaml | 5 +- .../namespaced/events-rule-raw-sync.yaml | 12 + .../develop/namespaced/lambda-dispatch.yaml | 1 + .../namespaced/lambda-raw-sync-role.yaml | 11 + .../develop/namespaced/lambda-raw-sync.yaml | 18 + .../prod/namespaced/events-rule-raw-sync.yaml | 12 + config/prod/namespaced/lambda-dispatch.yaml | 1 + .../prod/namespaced/lambda-raw-sync-role.yaml | 11 + config/prod/namespaced/lambda-raw-sync.yaml | 18 + src/lambda_function/raw_sync/README.md | 38 ++ src/lambda_function/raw_sync/app.py | 526 +++++++++++++++ src/lambda_function/raw_sync/template.yaml | 65 ++ templates/events-rule.yaml | 47 ++ templates/lambda-raw-sync-role.yaml | 71 ++ tests/test_lambda_raw_sync.py | 614 ++++++++++++++++++ 15 files changed, 1448 insertions(+), 2 deletions(-) create mode 100644 config/develop/namespaced/events-rule-raw-sync.yaml create mode 100644 config/develop/namespaced/lambda-raw-sync-role.yaml create mode 100644 config/develop/namespaced/lambda-raw-sync.yaml create mode 100644 config/prod/namespaced/events-rule-raw-sync.yaml create mode 100644 config/prod/namespaced/lambda-raw-sync-role.yaml create mode 100644 config/prod/namespaced/lambda-raw-sync.yaml create mode 100644 src/lambda_function/raw_sync/README.md create mode 100644 src/lambda_function/raw_sync/app.py create mode 100644 src/lambda_function/raw_sync/template.yaml create mode 100644 templates/events-rule.yaml create mode 100644 templates/lambda-raw-sync-role.yaml create mode 100644 tests/test_lambda_raw_sync.py diff --git a/.github/workflows/upload-and-deploy.yaml b/.github/workflows/upload-and-deploy.yaml index bd4f2837..54569e50 100755 --- a/.github/workflows/upload-and-deploy.yaml +++ b/.github/workflows/upload-and-deploy.yaml @@ -136,12 +136,13 @@ jobs: - name: Test scripts with pytest (lambda, etc.) run: | - pipenv run python -m pytest \ + pipenv run python -m pytest -v \ tests/test_s3_event_config_lambda.py \ tests/test_s3_to_glue_lambda.py \ tests/test_lambda_dispatch.py \ tests/test_consume_logs.py \ - tests/test_lambda_raw.py -v + tests/test_lambda_raw.py \ + tests/test_lambda_raw_sync.py - name: Test dev synapse folders for STS access with pytest run: > diff --git a/config/develop/namespaced/events-rule-raw-sync.yaml b/config/develop/namespaced/events-rule-raw-sync.yaml new file mode 100644 index 00000000..5cbaf74e --- /dev/null +++ b/config/develop/namespaced/events-rule-raw-sync.yaml @@ -0,0 +1,12 @@ +template: + path: events-rule.yaml +stack_name: "{{ stack_group_config.namespace }}-events-rule-raw-sync" +dependencies: + - develop/namespaced/lambda-raw-sync.yaml +parameters: + RuleName: "{{ stack_group_config.namespace }}-lambda-raw-sync-trigger" + RuleState: DISABLED + LambdaArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync::RawSyncFunctionArn" + CronSchedule: cron(0 0 * * ? *) +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/lambda-dispatch.yaml b/config/develop/namespaced/lambda-dispatch.yaml index acbd4d37..62096d4d 100644 --- a/config/develop/namespaced/lambda-dispatch.yaml +++ b/config/develop/namespaced/lambda-dispatch.yaml @@ -6,6 +6,7 @@ template: dependencies: - develop/namespaced/lambda-dispatch-role.yaml - develop/namespaced/sqs-input-to-dispatch.yaml + - develop/namespaced/sns-dispatch.yaml - develop/s3-cloudformation-bucket.yaml stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch" parameters: diff --git a/config/develop/namespaced/lambda-raw-sync-role.yaml b/config/develop/namespaced/lambda-raw-sync-role.yaml new file mode 100644 index 00000000..013127ad --- /dev/null +++ b/config/develop/namespaced/lambda-raw-sync-role.yaml @@ -0,0 +1,11 @@ +template: + path: lambda-raw-sync-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync-role" +dependencies: + - develop/namespaced/sns-dispatch.yaml +parameters: + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + S3TargetBucketName: {{ stack_group_config.raw_bucket_name }} + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/lambda-raw-sync.yaml b/config/develop/namespaced/lambda-raw-sync.yaml new file mode 100644 index 00000000..c1256b85 --- /dev/null +++ b/config/develop/namespaced/lambda-raw-sync.yaml @@ -0,0 +1,18 @@ +template: + type: sam + path: src/lambda_function/raw_sync/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - develop/namespaced/lambda-raw-sync-role.yaml + - develop/namespaced/sns-dispatch.yaml + - develop/s3-cloudformation-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync-role::RoleArn" + S3InputBucket: {{ stack_group_config.input_bucket_name }} + S3InputKeyPrefix: "{{ stack_group_config.namespace }}/" + S3RawBucket: {{ stack_group_config.raw_bucket_name }} + S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/" + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/events-rule-raw-sync.yaml b/config/prod/namespaced/events-rule-raw-sync.yaml new file mode 100644 index 00000000..4657a390 --- /dev/null +++ b/config/prod/namespaced/events-rule-raw-sync.yaml @@ -0,0 +1,12 @@ +template: + path: events-rule.yaml +stack_name: "{{ stack_group_config.namespace }}-events-rule-raw-sync" +dependencies: + - prod/namespaced/lambda-raw-sync.yaml +parameters: + RuleName: "{{ stack_group_config.namespace }}-lambda-raw-sync-trigger" + RuleState: DISABLED + LambdaArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync::RawSyncFunctionArn" + CronSchedule: cron(0 0 * * ? *) +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-dispatch.yaml b/config/prod/namespaced/lambda-dispatch.yaml index 3320b63b..7f82d750 100644 --- a/config/prod/namespaced/lambda-dispatch.yaml +++ b/config/prod/namespaced/lambda-dispatch.yaml @@ -6,6 +6,7 @@ template: dependencies: - prod/namespaced/lambda-dispatch-role.yaml - prod/namespaced/sqs-input-to-dispatch.yaml + - prod/namespaced/sns-dispatch.yaml - prod/s3-cloudformation-bucket.yaml stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch" parameters: diff --git a/config/prod/namespaced/lambda-raw-sync-role.yaml b/config/prod/namespaced/lambda-raw-sync-role.yaml new file mode 100644 index 00000000..d83efe5b --- /dev/null +++ b/config/prod/namespaced/lambda-raw-sync-role.yaml @@ -0,0 +1,11 @@ +template: + path: lambda-raw-sync-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync-role" +dependencies: + - prod/namespaced/sns-dispatch.yaml +parameters: + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + S3TargetBucketName: {{ stack_group_config.raw_bucket_name }} + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/lambda-raw-sync.yaml b/config/prod/namespaced/lambda-raw-sync.yaml new file mode 100644 index 00000000..5676560b --- /dev/null +++ b/config/prod/namespaced/lambda-raw-sync.yaml @@ -0,0 +1,18 @@ +template: + type: sam + path: src/lambda_function/raw_sync/template.yaml + artifact_bucket_name: {{ stack_group_config.template_bucket_name }} + artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda" +dependencies: + - prod/namespaced/lambda-raw-sync-role.yaml + - prod/namespaced/sns-dispatch.yaml + - prod/s3-cloudformation-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync" +parameters: + RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync-role::RoleArn" + S3InputBucket: {{ stack_group_config.input_bucket_name }} + S3InputKeyPrefix: "{{ stack_group_config.namespace }}/" + S3RawBucket: {{ stack_group_config.raw_bucket_name }} + S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/" + SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn" +stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/raw_sync/README.md b/src/lambda_function/raw_sync/README.md new file mode 100644 index 00000000..945b392f --- /dev/null +++ b/src/lambda_function/raw_sync/README.md @@ -0,0 +1,38 @@ +# Raw Sync Lambda + +The raw sync Lambda ensures that the input and raw S3 buckets are synchronized +by verifying that all non-zero sized JSON in each export in the input bucket +have a corresponding object in the raw bucket. + +If a JSON file from an export is found to not have a corresponding object in the raw bucket, +the file is submitted to the raw Lambda (via the dispatch SNS topic) for processing. + +## Development + +The Serverless Application Model Command Line Interface (SAM CLI) is an +extension of the AWS CLI that adds functionality for building and testing +Lambda applications. + +To use the SAM CLI, you need the following tools. + +* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community) + +You may need the following for local testing. +* [Python 3 installed](https://www.python.org/downloads/) + +You will also need to configure your AWS credentials, if you have not already done so. + +## Creating a local build + +Use the SAM CLI to build and test your lambda locally. +Build your application with the `sam build` command. + +```bash +cd src/lambda_function/raw_sync/ +sam build +``` + +## Tests + +Tests are available in `tests/test_lambda_raw_sync.py`. diff --git a/src/lambda_function/raw_sync/app.py b/src/lambda_function/raw_sync/app.py new file mode 100644 index 00000000..4e46101b --- /dev/null +++ b/src/lambda_function/raw_sync/app.py @@ -0,0 +1,526 @@ +""" +Raw Sync Lambda + +This script verifies that the input and raw S3 buckets are synchronized. + +This is accomplished by verifying that all non-zero sized JSON in each export +in the input S3 bucket, excepting "Manifest.json", have a corresponding object +in the raw S3 bucket. Because we only download the central directory, typically +located near the end of a zip archive, verification can be done extremely quickly +and without needing to download most of the export. + +If a JSON file from an export is found to not have a corresponding object in the raw bucket, +the export is submitted to the raw Lambda (via the dispatch SNS topic) for processing. +""" + +import json +import logging +import os +import struct +import zipfile +from collections import defaultdict +from io import BytesIO +from typing import Optional + +import boto3 + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def lambda_handler(event: dict, context: dict) -> None: + """ + Entrypoint for this Lambda. + + Args: + event (dict) + context (dict): Information about the runtime environment and + the current invocation + """ + s3_client = boto3.client("s3") + input_bucket = os.environ.get("INPUT_S3_BUCKET") + input_key_prefix = os.environ.get("INPUT_S3_KEY_PREFIX") + raw_bucket = os.environ.get("RAW_S3_BUCKET") + raw_key_prefix = os.environ.get("RAW_S3_KEY_PREFIX") + dispatch_sns_arn = os.environ.get("SNS_TOPIC_ARN") + main( + event=event, + s3_client=s3_client, + input_bucket=input_bucket, + input_key_prefix=input_key_prefix, + raw_bucket=raw_bucket, + raw_key_prefix=raw_key_prefix, + dispatch_sns_arn=dispatch_sns_arn, + ) + + +def append_s3_key(key: str, key_format: str, result: dict) -> None: + """ + Organizes an S3 object key by appending it to the appropriate entry in the result dictionary + + This is a helper function for `list_s3_objects`. + + Args: + key (str): The S3 object key to process. + key_format (str): The format of the key, either "raw" or "input". + result (dict): The dictionary where keys are appended. For the "raw" format, it is a + nested dictionary structured as result[data_type][cohort]. For "input", + it is structured as result[cohort]. + + Returns: + None + """ + result = result.copy() # shallow copy safe for append + if not key.endswith("/"): # Ignore keys that represent "folders" + key_components = key.split("/") + if key_format == "raw": + try: + data_type = next( + part.split("=")[1] + for part in key_components + if part.startswith("dataset=") + ) + cohort = next( + part.split("=")[1] + for part in key_components + if part.startswith("cohort=") + ) + result[data_type][cohort].append(key) + except StopIteration: + # Skip keys that don't match the expected pattern + return result + elif key_format == "input" and len(key_components) == 3: + cohort = key_components[1] + result[cohort].append(key) + return result + + +def list_s3_objects( + s3_client: boto3.client, bucket: str, key_prefix: str, key_format: str +) -> dict: + """ + Recursively list all objects under an S3 bucket and key prefix which + conform to a specified format. + + It's assumed that all objects under `key_prefix` have a key prefix + themselves which conforms to one of two formats: + + "input" format: `{namespace}/{cohort}/ + "raw" format: `{namespace}/json/dataset={data_type}/cohort={cohort}/` + + Args: + s3_client (boto3.client): An S3 client + bucket (str): The name of the S3 bucket. + key_prefix (str): The S3 key prefix to recursively list files from. + key_format (str): The format used by the keys, either "input" or "raw" + + Returns (dict): A dictionary where each hierarchy corresponds to the + ordering of the variables in the `key_format`, excepting {namespace}. + + For example: + + `key_format`="raw": + + { + "data_type_one": { + "cohort_one": [ + "object_one", + "object_two", + ... + ], + "cohort_two": [ + "object_one", + ... + ] + } + "data_type_two": { + ... + }, + ... + } + + `key_format`="input": + + { + "cohort_one": [ + "object_one", + "object_two", + ... + ], + "cohort_two": [ + "object_one", + ... + ] + } + """ + paginator = s3_client.get_paginator("list_objects_v2") + response_iterator = paginator.paginate(Bucket=bucket, Prefix=key_prefix) + if key_format == "raw": + result = defaultdict(lambda: defaultdict(list)) + elif key_format == "input": + result = defaultdict(list) + for response in response_iterator: + for obj in response.get("Contents", []): + key = obj["Key"] + result = append_s3_key( + key=key, + key_format=key_format, + result=result, + ) + return result + + +def match_corresponding_raw_object( + data_type: str, + cohort: str, + expected_key: str, + raw_keys: list[dict], +) -> Optional[str]: + """ + Find a matching raw object for a given export file and filename. + + Given a `namespace`, `cohort`, `data_type`, and `filename`, the matching + S3 key conforms to: + + `{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz` + + Args: + namespace (str): The namespace + data_type (str): The data type + cohort (str): The cohort name + file_identifier (str): The identifier of the original JSON file. The identifier is + the basename without any extensions. + expected_key (str): The key of the corresponding raw object. + raw_keys (dict): A dictionary formatted as the dictionary returned by `list_s3_objects`. + + Returns (str): The matching S3 key from `raw_keys`, or None if no match is found. + """ + logger.debug(f"Expecting to find matching object at {expected_key}") + + # Navigate through raw_keys to locate the correct `data_type` and `cohort` + if data_type in raw_keys: + if cohort in raw_keys[data_type]: + # Iterate through the list of keys under the specified `data_type`` and `cohort` + for key in raw_keys[data_type][cohort]: + if key == expected_key: + logger.debug(f"Found matching object {expected_key}") + return key + return None + + +def parse_content_range(content_range: str) -> tuple[int, ...]: + """ + Parse the ContentRange header to extract the start, end, and total size of the object. + + A helper function for `list_files_in_archive`. + + Args: + content_range (str): The ContentRange header value in the format 'bytes start-end/total'. + + Returns: + tuple: A tuple containing (range_start, range_end, total_size). + """ + # ContentRange format: 'bytes start-end/total' + _, range_info = content_range.split(" ") + range_start, range_end, total_size = map( + int, range_info.replace("-", "/").split("/") + ) + logger.info( + f"Read 0-indexed bytes from {range_start} to {range_end}, inclusive, " + f"out of {total_size} bytes." + ) + return range_start, range_end, total_size + + +def unpack_eocd_fields(body: bytes, eocd_offset: int) -> list[int]: + """ + Extract the End of Central Directory (EOCD) fields from the given body. + + A helper function for `list_files_in_archive`. + + The `unpack` method parses out: + + < - indicates Little-endian byte order + 4s - 4-byte string: + EOCD signature (4 bytes) + 4H - Four 2-byte unsigned short integers: + Number of this disk (2 bytes) + Disk where central directory starts (2 bytes) + Number of central directory records on this disk (2 bytes) + Total number of central directory records (2 bytes) + 2L - Two 4-byte unsigned long integers + Size of central directory (4 bytes) + Offset of start of central directory (4 bytes) + H - 2-byte unsigned short integer + Comment length (2 bytes) + + Args: + body (bytes): The byte content from which to extract EOCD fields. + eocd_offset (int): The offset position of the EOCD signature in the body. + + Returns: + tuple: A tuple containing (central_directory_offset, central_directory_size). + Both are int type. + """ + eocd_fields = struct.unpack("<4s4H2LH", body[eocd_offset : eocd_offset + 22]) + logger.debug(f"EOCD Record: {eocd_fields}") + central_directory_offset = eocd_fields[-2] + central_directory_size = eocd_fields[-3] + logger.debug(f"Central Directory Offset: {central_directory_offset}") + logger.debug(f"Central Directory Size: {central_directory_size}") + return central_directory_offset, central_directory_size + + +def determine_eocd_offset(body: bytes, content_range: str) -> int: + """ + Determine the offset of the End of Central Directory (EOCD) record in a given byte sequence. + + A helper function for `list_files_in_archive`. + + This function searches for the EOCD signature (`PK\x05\x06`) within the provided byte + sequence (`body`). + + Args: + body (bytes): The byte sequence in which to search for the EOCD signature. + content_range (str): A string representing the content range of the bytes. + This is used for logging purposes. + + Returns: + int: The offset of the EOCD signature within the provided byte sequence. Returns -1 if the + EOCD signature is not found, indicating that the EOCD is not present in the current + range of bytes. + """ + eocd_signature = b"PK\x05\x06" + eocd_offset = body.rfind(eocd_signature) + logger.debug(f"Found EOCD offset: {eocd_offset}") + + # Check if EOCD is present, else try again with a bigger chunk of data + if eocd_offset == -1: + logger.info( + "Did not find the end of central directory record in " + f"ContentRange {content_range}." + ) + return eocd_offset + + +def list_files_in_archive( + s3_client: boto3.client, bucket: str, key: str, range_size=64 * 1024 +) -> list[str]: + """ + Recursively lists files in a ZIP archive stored as an S3 object. + + Files are filtered by the same criteria as the dispatch Lambda. + + This function: + + 1. Fetches the last `range_size` bytes of an S3 object (assumed to contain a ZIP + archive) in order to locate and parse the End of Central Directory (EOCD) record. + If the EOCD record is not contained in the bytes, the function calls itself + recursively with a larger range size. This scenario is expected to be rare, + only occurring if there is a comment at the end of the ZIP file exceeding + `range_size` - 22 bytes (the size of the EOCD record minus the optional comment). + If a non-zip file was provided, this process will repeat until the entire file + has been read, since this is the only way to determine that there is no EOCD record. + + 2. Having found the EOCD record, the offset of the central directory is determined. + If the central directory is not fully contained within the retrieved range, + the function will call itself with the appropriate `range_size`. + + 3. The function then reads the central directory to extract the file list. + + Args: + s3_client (boto3.client): The Boto3 S3 client used to fetch the object from S3. + bucket (str): The name of the S3 bucket where the object is stored. + key (str): The key of the S3 object containing the ZIP archive. + range_size (int): The number of bytes to fetch from the tail of the S3 object on each call. + Defaults to 64 KB. + + Returns: + list[dict]: A list of dict with information about the files contained within the ZIP archive. + The dict has keys `filename` and `file_size`, which contain the respective values from + the ZipInfo object. + + Files are filtered to exclude: + + - Directories (i.e., paths containing "/"). + - Files named "Manifest". + - Empty files (file size == 0). + + If no files match the criteria or the EOCD record is not found, an empty list is returned. + + Notes: + - The function may trigger multiple recursive calls if the EOCD record is large or non-existent, + but is guaranteed to terminate upon retrieving the entire object. + """ + file_list = [] + object_response = s3_client.get_object( + Bucket=bucket, Key=key, Range=f"bytes=-{range_size}" + ) + logger.debug(f"Object Response: {object_response}") + + # Parse the ContentRange for later reference + range_start, range_end, total_size = parse_content_range( + content_range=object_response["ContentRange"] + ) + + # Determine end of central directory offset + tail = object_response["Body"].read() + eocd_offset = determine_eocd_offset( + body=tail, content_range=object_response["ContentRange"] + ) + if eocd_offset == -1: + adjusted_range_size = range_size * 2 + if adjusted_range_size > total_size * 2: + logger.error( + "Did not find an end of central directory record in " + f"s3://{bucket}/{key}" + ) + return [] + logger.warning( + f"Calling this function recursively with `range_size` = {adjusted_range_size}" + ) + return list_files_in_archive( + s3_client=s3_client, + bucket=bucket, + key=key, + range_size=adjusted_range_size, + ) + + # Extract the relevant EOCD fields + central_directory_offset, central_directory_size = unpack_eocd_fields( + body=tail, eocd_offset=eocd_offset + ) + + # Check if the entire central directory is contained within the fetched range + if ( + central_directory_offset < range_start + or central_directory_offset + central_directory_size > range_end + ): + logger.warning( + "The entire central directory is not contained in " + f"ContentRange {object_response['ContentRange']}." + ) + appropriate_range_size = total_size - central_directory_offset + logger.warning( + f"Calling this function recursively with `range_size` = {appropriate_range_size}" + ) + return list_files_in_archive( + s3_client=s3_client, + bucket=bucket, + key=key, + range_size=appropriate_range_size, + ) + + # Compile a list of file names which satisfy the same conditions used by dispatch Lambda + with zipfile.ZipFile(BytesIO(tail), "r") as zip_file: + for zip_info in zip_file.infolist(): + if ( + "/" not in zip_info.filename # necessary for pilot data only + and "Manifest" not in zip_info.filename + and zip_info.file_size > 0 + ): + file_object = { + "filename": zip_info.filename, + "file_size": zip_info.file_size, + } + file_list.append(file_object) + if len(file_list) == 0: + logger.warning( + f"Did not find any files in s3://{bucket}/{key} which " + "satisfy the conditions needed to be processed by the " + "raw Lambda." + ) + return file_list + + +def publish_to_sns( + bucket: str, key: str, path: str, file_size: int, sns_arn: str +) -> None: + """ + Publishes file information to an SNS topic. + + We use this function to publish a message to the dispatch SNS topic, allowing + the raw Lambda to process this file and write it as an object to the + raw S3 bucket. + + Args: + bucket (str): The input S3 bucket. + key (str): The S3 key of the export. + path (str): The file path within the export. + file_size (int): The size of the file in bytes. + sns_arn (str): The ARN of the dispatch SNS topic. + + Returns: + None + """ + sns_client = boto3.client("sns") + file_info = { + "Bucket": bucket, + "Key": key, + "Path": path, + "FileSize": file_size, + } + logger.info(f"Publishing {file_info} to {sns_arn}") + sns_client.publish(TopicArn=sns_arn, Message=json.dumps(file_info)) + + +def main( + event: dict, + s3_client: boto3.client, + input_bucket: str, + input_key_prefix: str, + raw_bucket: str, + raw_key_prefix: str, + dispatch_sns_arn: str, +) -> None: + export_keys = list_s3_objects( + s3_client=s3_client, + bucket=input_bucket, + key_prefix=input_key_prefix, + key_format="input", + ) + raw_keys = list_s3_objects( + s3_client=s3_client, + bucket=raw_bucket, + key_prefix=raw_key_prefix, + key_format="raw", + ) + for export_key in sum(export_keys.values(), []): + # input bucket keys are formatted like `{namespace}/{cohort}/{export_basename}` + namespace, cohort = export_key.split("/")[:2] + file_list = list_files_in_archive( + s3_client=s3_client, + bucket=input_bucket, + key=export_key, + ) + for file_object in file_list: + filename = file_object["filename"] + logger.info( + f"Checking corresponding raw object for {filename} " + f"from s3://{input_bucket}/{export_key}" + ) + data_type = filename.split("_")[0] + file_identifier = filename.split(".")[0] + expected_key = ( + f"{namespace}/json/dataset={data_type}" + f"/cohort={cohort}/{file_identifier}.ndjson.gz" + ) + corresponding_raw_object = match_corresponding_raw_object( + data_type=data_type, + cohort=cohort, + expected_key=expected_key, + raw_keys=raw_keys, + ) + if corresponding_raw_object is None: + logger.info( + f"Did not find corresponding raw object for {filename} from " + f"s3://{input_bucket}/{export_key} at " + f"s3://{raw_bucket}/{expected_key}" + ) + publish_to_sns( + bucket=input_bucket, + key=export_key, + path=filename, + file_size=file_object["file_size"], + sns_arn=dispatch_sns_arn, + ) diff --git a/src/lambda_function/raw_sync/template.yaml b/src/lambda_function/raw_sync/template.yaml new file mode 100644 index 00000000..acfb4efc --- /dev/null +++ b/src/lambda_function/raw_sync/template.yaml @@ -0,0 +1,65 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 + +Description: > + SAM Template for the raw sync Lambda. The raw sync Lambda ensures that + the input and raw S3 buckets are synchronized by verifying that all non-zero + sized JSON in the exports in the input bucket have a corresponding object in + the raw bucket. + +Parameters: + + RoleArn: + Type: String + Description: ARN of the raw sync Lambda role. + + S3InputBucket: + Type: String + Description: Name of the input S3 bucket. + + S3InputKeyPrefix: + Type: String + Description: S3 key prefix where exports are written. + + S3RawBucket: + Type: String + Description: Name of the Raw S3 bucket. + + S3RawKeyPrefix: + Type: String + Description: S3 key prefix where files are written. + + SNSTopicArn: + Type: String + Description: The ARN of the dispatch SNS topic. + + LambdaPythonVersion: + Type: String + Description: Python version to use for this lambda function + Default: "3.9" + +Resources: + RawSyncFunction: + Type: AWS::Serverless::Function + Properties: + PackageType: Zip + CodeUri: ./ + Handler: app.lambda_handler + Runtime: !Sub "python${LambdaPythonVersion}" + Role: !Ref RoleArn + MemorySize: 1024 + Timeout: 900 + Environment: + Variables: + INPUT_S3_BUCKET: !Ref S3InputBucket + INPUT_S3_KEY_PREFIX: !Ref S3InputKeyPrefix + RAW_S3_BUCKET: !Ref S3RawBucket + RAW_S3_KEY_PREFIX: !Ref S3RawKeyPrefix + SNS_TOPIC_ARN: !Ref SNSTopicArn + +Outputs: + RawSyncFunctionArn: + Description: Arn of the raw sync Lambda. + Value: !GetAtt RawSyncFunction.Arn + Export: + Name: !Sub "${AWS::Region}-${AWS::StackName}-RawSyncFunctionArn" diff --git a/templates/events-rule.yaml b/templates/events-rule.yaml new file mode 100644 index 00000000..3f95cce0 --- /dev/null +++ b/templates/events-rule.yaml @@ -0,0 +1,47 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: Cloudwatch Events rule to trigger a Lambda on a cron schedule. + +Parameters: + + RuleName: + Type: String + Description: Name of this rule. + + RuleState: + Type: String + Description: > + Whether to enable this rule upon deployment. Suggested values + are "DISABLED" or "ENABLED". + + LambdaArn: + Type: String + Description: ARN of the Lambda to trigger. + + CronSchedule: + Type: String + Description: When to trigger the Lambda + Default: "cron(0 0 * * ? *)" # At midnight UTC every day + +Resources: + MyCloudWatchRule: + Type: AWS::Events::Rule + Properties: + Name: !Ref RuleName + State: !Ref RuleState + ScheduleExpression: !Ref CronSchedule + Targets: + - Arn: !Ref LambdaArn + Id: LambdaTarget + + LambdaInvokePermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: !Ref LambdaArn + Principal: events.amazonaws.com + SourceArn: !GetAtt MyCloudWatchRule.Arn + +Outputs: + RuleArn: + Description: "ARN of the Cloudwatch events rule" + Value: !GetAtt MyCloudWatchRule.Arn diff --git a/templates/lambda-raw-sync-role.yaml b/templates/lambda-raw-sync-role.yaml new file mode 100644 index 00000000..5c4da5df --- /dev/null +++ b/templates/lambda-raw-sync-role.yaml @@ -0,0 +1,71 @@ +AWSTemplateFormatVersion: '2010-09-09' + +Transform: AWS::Serverless-2016-10-31 + +Description: > + An IAM Role for the raw sync Lambda + +Parameters: + S3SourceBucketName: + Type: String + Description: Name of the S3 bucket where exports are deposited. + + S3TargetBucketName: + Type: String + Description: Name of the S3 bucket where raw JSON is written to. + + SNSTopicArn: + Type: String + Description: > + ARN of the SNS topic where files found not to have a corresponding + object in the target bucket will be published to for processing. + +Resources: + RawRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: ReadS3 + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:List* + Resource: + - !Sub arn:aws:s3:::${S3SourceBucketName} + - !Sub arn:aws:s3:::${S3SourceBucketName}/* + - !Sub arn:aws:s3:::${S3TargetBucketName} + - !Sub arn:aws:s3:::${S3TargetBucketName}/* + - PolicyName: PublishToSNS + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sns:Publish + Resource: + - !Ref SNSTopicArn + +Outputs: + RoleName: + Value: !Ref RawRole + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleName' + + RoleArn: + Value: !GetAtt RawRole.Arn + Export: + Name: !Sub '${AWS::Region}-${AWS::StackName}-RoleArn' diff --git a/tests/test_lambda_raw_sync.py b/tests/test_lambda_raw_sync.py new file mode 100644 index 00000000..227db508 --- /dev/null +++ b/tests/test_lambda_raw_sync.py @@ -0,0 +1,614 @@ +import io +import json +import struct +import zipfile +from collections import defaultdict +from unittest.mock import MagicMock, patch + +import boto3 +import pytest +from moto import mock_s3, mock_sns, mock_sqs + +import src.lambda_function.raw_sync.app as app # Replace with the actual module name + + +@pytest.fixture +def s3_client(): + """Fixture to create a mocked S3 client.""" + with mock_s3(): + s3 = boto3.client("s3", region_name="us-east-1") + yield s3 + + +@pytest.fixture +def out_of_range_central_directory(): + central_directory = ( + b"\x50\x4b\x01\x02" # Central file header signature (4 bytes) + + b"\x14\x00" # Version made by (2 bytes) + + b"\x0A\x00" # Version needed to extract (2 bytes) + + b"\x00\x00" # General purpose bit flag (2 bytes) + + b"\x00\x00" # Compression method (2 bytes, 0 = no compression) + + b"\x00\x00" # File last modification time (2 bytes) + + b"\x00\x00" # File last modification date (2 bytes) + + b"\x00\x00\x00\x00" # CRC-32 (4 bytes, placeholder value) + + b"\x00\x00\x00\x00" # Compressed size (4 bytes, placeholder value) + + b"\x00\x00\x00\x00" # Uncompressed size (4 bytes, placeholder value) + + b"\x08\x00" # File name length (2 bytes, "test.txt" is 8 bytes) + + b"\x00\x00" # Extra field length (2 bytes, 0 bytes) + + b"\x00\x00" # File comment length (2 bytes, 0 bytes) + + b"\x00\x00" # Disk number start (2 bytes) + + b"\x00\x00" # Internal file attributes (2 bytes) + + b"\x00\x00\x00\x00" # External file attributes (4 bytes) + + b"\x00\x00\x00\x00" # Relative offset of local header (4 bytes, placeholder value) + + b"test.txt" # File name ("test.txt") + ) + return central_directory + + +@pytest.fixture +def out_of_range_eocd_record(out_of_range_central_directory): + return ( + b"PK\x05\x06" + + b"\x00" * 8 + + struct.pack(" 1 + ), "Function was not called recursively." + + +def test_list_files_in_archive_no_eocd_returns_empty_list( + s3_client, setup_list_files_in_archive_s3, caplog +): + """Test if the function returns an empty list when EOCD is not found at all.""" + bucket_name = "list-files-in-archive-bucket" + key = "no_eocd.notazip" + with caplog.at_level("ERROR"): + result = app.list_files_in_archive(s3_client, bucket_name, key, range_size=16) + assert result == [] + assert len(caplog.text) + + +@patch("src.lambda_function.raw_sync.app.list_files_in_archive") +def test_list_files_in_archive_recursive_central_directory_out_of_range( + mock_list_files_in_archive, + s3_client, + setup_list_files_in_archive_s3, + out_of_range_eocd_record, +): + """Test if the function is called recursively when central directory is not fully contained.""" + bucket_name = "list-files-in-archive-bucket" + key = "central_directory_out_of_range.zip" + + # Call the original function + mock_list_files_in_archive.side_effect = ( + lambda *args, **kwargs: unpatched_list_files_in_archive(*args, **kwargs) + ) + + # Set `range_size` to be just slightly larger than the EOCD record + # This way we won't get the entire central directory, initially + result = app.list_files_in_archive( + s3_client, bucket_name, key, range_size=len(out_of_range_eocd_record) + 2 + ) + + # Check that the function was called recursively + assert ( + mock_list_files_in_archive.call_count > 1 + ), "Function was not called recursively when central directory was out of range." + + +def test_list_files_in_archive_returns_filenames( + s3_client, setup_list_files_in_archive_s3 +): + """Test if the function returns a list of filenames.""" + bucket_name = "list-files-in-archive-bucket" + key = "valid.zip" + + result = app.list_files_in_archive( + s3_client=s3_client, bucket=bucket_name, key=key, range_size=64 * 1024 + ) + + assert isinstance(result, list), "Expected result to be a list." + assert len(result) > 0, "Expected list to contain filenames." + assert "filename" in result[0] + assert "file_size" in result[0] + filenames = [file_object["filename"] for file_object in result] + assert "file1.txt" in filenames, "Expected 'file1.txt' to be in the result." + assert "file2.txt" in filenames, "Expected 'file2.txt' to be in the result." + + +def test_list_files_in_archive_empty_zip( + s3_client, setup_list_files_in_archive_s3, caplog +): + """ + Test that an empty ZIP archive returns an empty file list. + """ + # Retrieve the bucket name from the fixture + bucket_name = setup_list_files_in_archive_s3 + key = "empty.zip" + + with caplog.at_level("WARNING"): + result = app.list_files_in_archive(s3_client, bucket=bucket_name, key=key) + + assert result == [] + assert len(caplog.text) + + +def test_append_s3_key_raw(): + """Test append_s3_key with 'raw' format.""" + key = "namespace/json/dataset=example_data/cohort=example_cohort/file1.json" + key_format = "raw" + result = defaultdict(lambda: defaultdict(list)) + + # Set up the result dictionary with nested structure + app.append_s3_key(key, key_format, result) + + # Expected result structure after processing the key + result = expected_result = {"example_data": {"example_cohort": [key]}} + + # Assert that the key was correctly added to the result dictionary + assert result == expected_result + + +def test_append_s3_key_input(): + """Test append_s3_key with 'input' format.""" + key = "namespace/example_cohort/file1.json" + key_format = "input" + result = defaultdict(list) + + # Set up the result dictionary with the flat structure + app.append_s3_key(key, key_format, result) + + # Expected result structure after processing the key + result = expected_result = {"example_cohort": [key]} + + # Assert that the key was correctly added to the result dictionary + assert result == expected_result + + +def test_append_s3_key_stop_iteration(): + """Test that result is unmodified when StopIteration is encountered.""" + key = "namespace/json/invalid_key_structure" + key_format = "raw" + + # Initial result dictionary (should remain unchanged) + result = { + "data_type_one": { + "cohort_one": [ + "namespace/json/dataset=data_type_one/cohort=cohort_one/file1.json" + ] + } + } + # Copy the result to check for modifications later + original_result = result.copy() + + result = app.append_s3_key(key, key_format, result) + + assert ( + result == original_result + ), "Expected result to remain unmodified on StopIteration." + + +def test_list_s3_objects_raw_format(s3_client, setup_s3): + """Test the list_s3_objects function with the "raw" key format.""" + bucket_name = "test-raw-bucket" + key_prefix = "namespace/json/" + key_format = "raw" + + expected_output = { + "data_type_one": { + "cohort_one": [ + "namespace/json/dataset=data_type_one/cohort=cohort_one/object_one.ndjson.gz" + ], + "cohort_two": [ + "namespace/json/dataset=data_type_one/cohort=cohort_two/object_two.ndjson.gz" + ], + }, + "data_type_two": { + "cohort_one": [ + "namespace/json/dataset=data_type_two/cohort=cohort_one/object_three.ndjson.gz" + ], + }, + } + + result = app.list_s3_objects(s3_client, bucket_name, key_prefix, key_format) + assert result == expected_output, f"Expected {expected_output}, but got {result}" + + +def test_list_s3_objects_input_format(s3_client, setup_s3): + """Test the list_s3_objects function with the "input" key format.""" + bucket_name = "test-input-bucket" + key_prefix = "namespace/" + key_format = "input" + + expected_output = { + "cohort_one": [ + "namespace/cohort_one/object_one", + "namespace/cohort_one/object_two", + ], + "cohort_two": ["namespace/cohort_two/object_one"], + } + + result = app.list_s3_objects(s3_client, bucket_name, key_prefix, key_format) + assert result == expected_output, f"Expected {expected_output}, but got {result}" + + +def test_match_corresponding_raw_object_found(mocked_raw_keys): + """Test when a matching key is found.""" + namespace = "namespace" + data_type = "data_type_one" + cohort = "cohort_one" + file_identifier = "object_one" + + # Expected matching key + expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz" + + result = app.match_corresponding_raw_object( + data_type=data_type, + cohort=cohort, + expected_key=expected_key, + raw_keys=mocked_raw_keys, + ) + assert result == expected_key + + +def test_match_corresponding_raw_object_non_matching_data_type(mocked_raw_keys): + """Test when there is no match due to a non-matching data type.""" + namespace = "namespace" + data_type = "fake_data_type" + cohort = "cohort_one" + file_identifier = "object_one" + + # Expected matching key + expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz" + + result = app.match_corresponding_raw_object( + data_type=data_type, + cohort=cohort, + expected_key=expected_key, + raw_keys=mocked_raw_keys, + ) + assert result is None + + +def test_match_corresponding_raw_object_non_matching_cohort(mocked_raw_keys): + """Test when there is no match due to a non-matching cohort.""" + namespace = "namespace" + data_type = "data_type_one" + cohort = "fake_cohort" + file_identifier = "object_four" + + # Expected matching key + expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz" + + result = app.match_corresponding_raw_object( + data_type=data_type, + cohort=cohort, + expected_key=expected_key, + raw_keys=mocked_raw_keys, + ) + assert result is None + + +def test_match_corresponding_raw_object_non_matching_file_identifier(mocked_raw_keys): + """Test when there is no match due to a non-matching file identifier.""" + namespace = "namespace" + data_type = "data_type_one" + cohort = "cohort_one" + file_identifier = "nonexistent_file" + + # Expected matching key + expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz" + + result = app.match_corresponding_raw_object( + data_type=data_type, + cohort=cohort, + expected_key=expected_key, + raw_keys=mocked_raw_keys, + ) + assert result is None + + +@mock_sns +@mock_sqs +def test_publish_to_sns_with_sqs_subscription(): + """ + Test publish_to_sns by subscribing an SQS queue to an SNS topic and verifying + that the message was delivered to the SQS queue. + """ + + # Step 1: Set up the mock SNS environment + sns_client = boto3.client("sns", region_name="us-east-1") + response = sns_client.create_topic(Name="test-topic") + sns_arn = response["TopicArn"] + + # Step 2: Set up the mock SQS environment + sqs_client = boto3.client("sqs", region_name="us-east-1") + sqs_response = sqs_client.create_queue(QueueName="test-queue") + sqs_url = sqs_response["QueueUrl"] + + # Get the SQS queue ARN + sqs_arn_response = sqs_client.get_queue_attributes( + QueueUrl=sqs_url, AttributeNames=["QueueArn"] + ) + sqs_arn = sqs_arn_response["Attributes"]["QueueArn"] + + # Step 3: Subscribe the SQS queue to the SNS topic + sns_client.subscribe(TopicArn=sns_arn, Protocol="sqs", Endpoint=sqs_arn) + + # Step 4: Input parameters for the function + bucket = "test-bucket" + key = "export/file1.json" + path = "file1.json" + file_size = 12345 + + # Step 5: Call the function to publish to SNS + app.publish_to_sns(bucket, key, path, file_size, sns_arn) + + # Step 6: Poll the SQS queue for the message + messages = sqs_client.receive_message(QueueUrl=sqs_url, MaxNumberOfMessages=1) + + # Step 7: Assert the message was published to the SQS queue + assert "Messages" in messages, "No messages were found in the SQS queue." + + # Step 8: Verify the message content + message_body = json.loads(messages["Messages"][0]["Body"]) + sns_message = json.loads(message_body["Message"]) + + # Expected message payload + expected_message = { + "Bucket": bucket, + "Key": key, + "Path": path, + "FileSize": file_size, + } + + assert ( + sns_message == expected_message + ), f"Expected {expected_message}, got {sns_message}" + + # Step 9: Cleanup (delete the message from the queue to avoid polluting future tests) + sqs_client.delete_message( + QueueUrl=sqs_url, ReceiptHandle=messages["Messages"][0]["ReceiptHandle"] + )