diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index ef2de12a1..dfeda0211 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -12,10 +12,10 @@ jobs: with: fetch-depth: 0 # This causes all history to be fetched, which is required for calculate-version to function - - name: Install Python 3.8 + - name: Install Python 3.9 uses: actions/setup-python@v1 with: - python-version: 3.8 + python-version: 3.9 - name: Upgrade python pip run: python -m pip install --upgrade pip diff --git a/ack_backend/src/ack_processor.py b/ack_backend/src/ack_processor.py index 4808236c5..8c2e36b1f 100644 --- a/ack_backend/src/ack_processor.py +++ b/ack_backend/src/ack_processor.py @@ -1,11 +1,16 @@ """Ack lambda handler""" import json +from utils_for_ack_lambda import get_environment from typing import Union from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator from update_ack_file import update_ack_file, create_ack_data +from clients import s3_client +ENVIRONMENT = get_environment() +SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources" + @convert_messsage_to_ack_row_logging_decorator def convert_message_to_ack_row(message, created_at_formatted_string): """ @@ -50,7 +55,7 @@ def lambda_handler(event, context): created_at_formatted_string = None array_of_rows = [] - + for i, record in enumerate(event["Records"]): try: @@ -66,7 +71,13 @@ def lambda_handler(event, context): for message in incoming_message_body: array_of_rows.append(convert_message_to_ack_row(message, created_at_formatted_string)) + row_count = get_row_count_stream(SOURCE_BUCKET_NAME, f"processing/{file_key}") + update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows, row_count=row_count) + return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")} - update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows) - return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")} +def get_row_count_stream(bucket_name, key): + response = s3_client.get_object(Bucket=bucket_name, Key=key) + count = sum(1 for line in response['Body'].iter_lines() if line.strip()) + + return count \ No newline at end of file diff --git a/ack_backend/src/audit_table.py b/ack_backend/src/audit_table.py new file mode 100644 index 000000000..6c7f247de --- /dev/null +++ b/ack_backend/src/audit_table.py @@ -0,0 +1,67 @@ +"""Add the filename to the audit table and check for duplicates.""" + +import os +from typing import Union +from boto3.dynamodb.conditions import Key,Attr +from clients import dynamodb_client, dynamodb_resource, logger +from errors import UnhandledAuditTableError + + +def update_audit_table_status(file_key: str) -> str: + """ + Update the status in the audit table. + """ + try: + table_name = os.environ["AUDIT_TABLE_NAME"] + file_name_gsi = "filename_index" + file_name_response = dynamodb_resource.Table(table_name).query( + IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key) + ) + items = file_name_response.get("Items", []) + message_id = items[0].get("message_id") + queue_name = items[0].get("queue_name") + # Add to the audit table + dynamodb_client.update_item( + TableName=table_name, + Key={"message_id": {"S": message_id}}, + UpdateExpression="SET #status = :status", + ExpressionAttributeNames={"#status": "status"}, + ExpressionAttributeValues={":status": {"S": "Processed"}}, + ConditionExpression="attribute_exists(message_id)" + ) + logger.info("%s file, with message id %s, and the status successfully updated to audit table", file_key, message_id) + return queue_name + except Exception as error: # pylint: disable = broad-exception-caught + error_message = error #f"Error adding {file_key} to the audit table" + logger.error(error_message) + raise UnhandledAuditTableError(error_message) from error + +def get_queued_file_details(queue_name: str) -> tuple[Union[None,str],Union[None,str]]: + """ + Check for queued files which return none or oldest file queued for processing. + Returns a tuple in the format (file_name, message_id) for the oldest file. + Defaults to (none,none) if no file found in queued status + """ + table_name = os.environ["AUDIT_TABLE_NAME"] + queue_name_gsi = "queue_name_index" + + queue_response = dynamodb_resource.Table(table_name).query( + IndexName=queue_name_gsi, + KeyConditionExpression=Key("queue_name").eq(queue_name) + & Key("status").eq("Queued"), + ) + if queue_response["Items"]: + file_name, message_id = get_file_name(queue_response) + return file_name, message_id + else: + return None, None + +def get_file_name(queue_response: dict) -> tuple[str,str]: + """ + Returns (file_name, message_id) for the oldest file. + """ + sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"]) + first_record = sorted_item[0] + file_name = first_record.get("filename") + message_id = first_record.get("message_id") + return file_name, message_id \ No newline at end of file diff --git a/ack_backend/src/clients.py b/ack_backend/src/clients.py index e7bff9331..ac7cdc69e 100644 --- a/ack_backend/src/clients.py +++ b/ack_backend/src/clients.py @@ -1,12 +1,15 @@ """Initialise clients and logger""" import logging -from boto3 import client as boto3_client +from boto3 import client as boto3_client, resource as boto3_resource REGION_NAME = "eu-west-2" s3_client = boto3_client("s3", region_name=REGION_NAME) firehose_client = boto3_client("firehose", region_name=REGION_NAME) +lambda_client = boto3_client('lambda', region_name=REGION_NAME) +dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME) +dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME) # Logger diff --git a/ack_backend/src/errors.py b/ack_backend/src/errors.py new file mode 100644 index 000000000..6f33274bf --- /dev/null +++ b/ack_backend/src/errors.py @@ -0,0 +1,29 @@ +"""Custom exceptions for the Filename Processor.""" + + +class DuplicateFileError(Exception): + """A custom exception for when it is identified that the file is a duplicate.""" + + +class ProcessingError(Exception): + """A custom exception for when it is identified that supplier_vaccine file is under processing""" + + +class UnhandledAuditTableError(Exception): + """A custom exception for when an unexpected error occurs whilst adding the file to the audit table.""" + + +class VaccineTypePermissionsError(Exception): + """A custom exception for when the supplier does not have the necessary vaccine type permissions.""" + + +class InvalidFileKeyError(Exception): + """A custom exception for when the file key is invalid.""" + + +class InvalidSupplierError(Exception): + """A custom exception for when the supplier has not been correctly identified.""" + + +class UnhandledSqsError(Exception): + """A custom exception for when an unexpected error occurs whilst sending a message to SQS.""" diff --git a/ack_backend/src/update_ack_file.py b/ack_backend/src/update_ack_file.py index 4ae8abd6d..7149c1b8c 100644 --- a/ack_backend/src/update_ack_file.py +++ b/ack_backend/src/update_ack_file.py @@ -1,12 +1,17 @@ """Functions for adding a row of data to the ack file""" import os +import json from io import StringIO, BytesIO from typing import Union from botocore.exceptions import ClientError from constants import Constants -from clients import s3_client, logger - +from audit_table import update_audit_table_status, get_queued_file_details +from clients import s3_client, logger, lambda_client +#TODO move to constants +ENVIRONMENT = os.getenv("ENVIRONMENT") +SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources" +FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME") def create_ack_data( created_at_formatted_string: str, @@ -63,7 +68,8 @@ def obtain_current_ack_content(ack_bucket_name: str, ack_file_key: str) -> Strin def upload_ack_file( - ack_bucket_name: str, ack_file_key: str, accumulated_csv_content: StringIO, ack_data_row: any + ack_bucket_name: str, ack_file_key: str, accumulated_csv_content: StringIO, ack_data_row: any, row_count: int, archive_ack_file_key: str, file_key: str + , created_at_formatted_string: str ) -> None: """Adds the data row to the uploaded ack file""" for row in ack_data_row: @@ -72,12 +78,71 @@ def upload_ack_file( accumulated_csv_content.write(cleaned_row + "\n") csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8")) s3_client.upload_fileobj(csv_file_like_object, ack_bucket_name, ack_file_key) - logger.info("Ack file updated to %s: %s", ack_bucket_name, ack_file_key) + row_count_dest = get_row_count_stream(ack_bucket_name, ack_file_key) + if row_count == row_count_dest: + move_file(ack_bucket_name, ack_file_key, archive_ack_file_key) + source_key = f"processing/{file_key}" + destination_key = f"archive/{file_key}" + move_file(SOURCE_BUCKET_NAME, source_key, destination_key) + queue_name = update_audit_table_status(file_key) + file_key, message_id = get_queued_file_details(queue_name) + if file_key and message_id is not None: + # Directly invoke the Lambda function + invoke_filename_lambda(SOURCE_BUCKET_NAME, file_key, message_id) + + logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key) -def update_ack_file(file_key: str, created_at_formatted_string: str, ack_data_rows: any) -> None: +def update_ack_file( + file_key: str, + created_at_formatted_string: str, + ack_data_rows: any, + row_count +) -> None: """Updates the ack file with the new data row based on the given arguments""" - ack_file_key = f"forwardedFile/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" + ack_file_key = f"TempAck/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" + archive_ack_file_key = f"forwardedFile/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" ack_bucket_name = os.getenv("ACK_BUCKET_NAME") accumulated_csv_content = obtain_current_ack_content(ack_bucket_name, ack_file_key) - upload_ack_file(ack_bucket_name, ack_file_key, accumulated_csv_content, ack_data_rows) + upload_ack_file(ack_bucket_name, ack_file_key, accumulated_csv_content, ack_data_rows, row_count, archive_ack_file_key, file_key, created_at_formatted_string) + +def get_row_count_stream(bucket_name, key): + response = s3_client.get_object(Bucket=bucket_name, Key=key) + count = sum(1 for _ in response['Body'].iter_lines()) + return count + +def move_file(bucket_name: str, source_key: str, destination_key: str) -> None: + + """ Moves a file from one location to another in S3 by copying and then deleting it. + Args: bucket_name (str): Name of the S3 bucket. + source_key (str): Source file key. + destination_key (str): Destination file key. + """ + s3_client.copy_object( + Bucket=bucket_name, + CopySource={"Bucket": bucket_name, "Key": source_key}, + Key=destination_key + ) + s3_client.delete_object(Bucket=bucket_name, Key=source_key) + logger.info("File moved from %s to %s", source_key, destination_key) + + +def invoke_filename_lambda(source_bucket_name, file_key, message_id): + lambda_payload = {"Records":[ + { + "s3": { + "bucket": { + "name": source_bucket_name + }, + "object": { + "key": file_key + } + }, + "message_id": message_id + } + ] + } + lambda_client.invoke( + FunctionName=FILE_NAME_PROC_LAMBDA_NAME, + InvocationType="Event", + Payload=json.dumps(lambda_payload)) \ No newline at end of file diff --git a/ack_backend/src/utils_for_ack_lambda.py b/ack_backend/src/utils_for_ack_lambda.py new file mode 100644 index 000000000..c2efdb4b6 --- /dev/null +++ b/ack_backend/src/utils_for_ack_lambda.py @@ -0,0 +1,10 @@ +"""Utils for ack lambda""" + +import os + + +def get_environment() -> str: + """Returns the current environment. Defaults to internal-dev for pr and user environments""" + _env = os.getenv("ENVIRONMENT") + # default to internal-dev for pr and user environments + return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev" \ No newline at end of file diff --git a/azure/azure-release-pipeline.yml b/azure/azure-release-pipeline.yml index a366fe95a..5b29903a1 100644 --- a/azure/azure-release-pipeline.yml +++ b/azure/azure-release-pipeline.yml @@ -76,5 +76,5 @@ extends: - int - ref jinja_templates: - DOMAIN_ENDPOINT: https://prod.imms.prod.vds.platform.nhs.uk + DOMAIN_ENDPOINT: https://blue.imms.prod.vds.platform.nhs.uk diff --git a/backend/tests/test_fhir_controller.py b/backend/tests/test_fhir_controller.py index baccf40de..711c678c9 100644 --- a/backend/tests/test_fhir_controller.py +++ b/backend/tests/test_fhir_controller.py @@ -918,6 +918,7 @@ def test_create_immunization_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "create" }, "body": imms.json(), } @@ -936,6 +937,7 @@ def test_create_immunization_for_batch(self, mock_send_message): "row_id": aws_event["headers"]["row_id"], "created_at_formatted_string": aws_event["headers"]["created_at_formatted_string"], "local_id": aws_event["headers"]["local_id"], + "operation_requested": "create" } ) @@ -961,6 +963,7 @@ def test_create_immunization_for_unauthorized(self): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "create" }, "body": imms.json(), } @@ -1067,6 +1070,7 @@ def test_duplicate_record_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "create" }, "body": imms.json(), } @@ -1108,6 +1112,7 @@ def test_pds_unhandled_error_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "create" }, "body": imms.json(), } @@ -1165,6 +1170,7 @@ def test_update_immunization_etag_missing(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": imms_id}, @@ -1192,6 +1198,7 @@ def test_update_immunization_duplicate(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": imms_id}, @@ -1222,6 +1229,7 @@ def test_update_immunization_UnauthorizedVaxError(self): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": imms_id}, @@ -1277,6 +1285,7 @@ def test_update_immunization_for_batch_existing_record_is_none(self, mock_sqs_me "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": imms_id}, @@ -1314,6 +1323,7 @@ def test_update_immunization_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": imms_id}, @@ -1336,6 +1346,7 @@ def test_update_immunization_for_batch(self, mock_send_message): "row_id": aws_event["headers"]["row_id"], "created_at_formatted_string": aws_event["headers"]["created_at_formatted_string"], "local_id": aws_event["headers"]["local_id"], + "operation_requested": "update" } ) @@ -1473,6 +1484,7 @@ def test_validation_error_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": imms, "pathParameters": {"id": "valid-id"}, @@ -1592,6 +1604,7 @@ def test_inconsistent_imms_id_for_batch(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "body": bad_json, "pathParameters": {"id": "an-id"}, @@ -1663,6 +1676,7 @@ def test_validate_imms_id_for_batch(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "update" }, "pathParameters": {"id": "invalid %$ id"}, "body": valid_json, @@ -1706,6 +1720,7 @@ def test_validate_imms_id_for_batch(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "delete" }, } @@ -1765,6 +1780,7 @@ def test_delete_immunization_unauthorised_vax(self, mock_sqs_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "delete" }, "pathParameters": {"id": imms_id}, } @@ -1790,6 +1806,7 @@ def test_delete_immunization_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "delete" }, "pathParameters": {"id": imms_id}, } @@ -1807,6 +1824,7 @@ def test_delete_immunization_for_batch(self, mock_send_message): "row_id": lambda_event["headers"]["row_id"], "created_at_formatted_string": lambda_event["headers"]["created_at_formatted_string"], "local_id": lambda_event["headers"]["local_id"], + "operation_requested": "delete" } ) @@ -1850,6 +1868,7 @@ def test_immunization_exception_not_found_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "delete" }, "pathParameters": {"id": "a-non-existing-id"}, } @@ -1897,6 +1916,7 @@ def test_immunization_unhandled_error_for_batch(self, mock_send_message): "row_id": "123", "created_at_formatted_string": "2020-01-01", "local_id": ValidValues.test_local_id, + "operation_requested": "delete" }, "pathParameters": {"id": "a-non-existing-id"}, } diff --git a/delta_backend/src/delta.py b/delta_backend/src/delta.py index 65d18fe3f..059d98155 100644 --- a/delta_backend/src/delta.py +++ b/delta_backend/src/delta.py @@ -65,7 +65,7 @@ def handler(event, context): imms_id = new_image["PK"]["S"].split("#")[1] vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"]) supplier_system = new_image["SupplierSystem"]["S"] - if supplier_system not in ("DPS_FULL", "DPS_REDUCED"): + if supplier_system not in ("DPSFULL", "DPSREDUCED"): operation = new_image["Operation"]["S"] if operation == "CREATE": operation = "NEW" diff --git a/filenameprocessor/src/audit_table.py b/filenameprocessor/src/audit_table.py index 3870faeb8..c43f80032 100644 --- a/filenameprocessor/src/audit_table.py +++ b/filenameprocessor/src/audit_table.py @@ -6,41 +6,123 @@ from errors import DuplicateFileError, UnhandledAuditTableError -def add_to_audit_table(message_id: str, file_key: str, created_at_formatted_str: str) -> None: +def upsert_audit_table( + message_id: str, + file_key: str, + created_at_formatted_str: str, + queue_name: str, + process_status: str, + query_type: str, +) -> None: """ - Adds the filename to the audit table. + Adds or updates the filename in the audit table. Raises an error if the file is a duplicate (after adding it to the audit table). """ try: table_name = os.environ["AUDIT_TABLE_NAME"] - file_name_gsi = os.environ["FILE_NAME_GSI"] - + file_name_gsi = "filename_index" + queue_name_gsi = "queue_name_index" + processing_exists = False + if query_type == "update": + dynamodb_client.update_item( + TableName=table_name, + Key={"message_id": {"S": message_id}}, + UpdateExpression="SET #status = :status", + ExpressionAttributeNames={"#status": "status"}, + ExpressionAttributeValues={":status": {"S": "Processing"}}, + ConditionExpression="attribute_exists(message_id)", + ) + logger.info( + "%s file set for processing, and the status successfully updated in audit table", + file_key, + ) + return True # Check for duplicates before adding to the table (if the query returns any items, then the file is a duplicate) file_name_response = dynamodb_resource.Table(table_name).query( IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key) ) - duplicate_exists = bool(file_name_response.get("Items")) + # Check for files under processing for Supplier_Vaccine combination, if yes queue file for processing + if not duplicate_exists and not process_status.eq("Processed"): + queue_response = dynamodb_resource.Table(table_name).query( + IndexName=queue_name_gsi, + KeyConditionExpression=Key("queue_name").eq(queue_name) + & Key("status").eq(process_status), # Need to update it to processing + ) + if queue_response["Items"]: + process_status = "Queued" + processing_exists = True + # Add to the audit table (regardless of whether it is a duplicate) dynamodb_client.put_item( TableName=table_name, Item={ "message_id": {"S": message_id}, "filename": {"S": file_key}, - "status": {"S": "Not processed - duplicate" if duplicate_exists else "Processed"}, + "queue_name": {"S": queue_name}, + "status": { + "S": ( + "Not processed - duplicate" + if duplicate_exists + else process_status + ) + }, "timestamp": {"S": created_at_formatted_str}, }, ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites ) - logger.info("%s file, with message id %s, successfully added to audit table", file_key, message_id) + logger.info( + "%s file, with message id %s, successfully added to audit table", + file_key, + message_id, + ) + # If a duplicte exists, raise an exception + if duplicate_exists: + logger.error( + "%s file duplicate added to s3 at the following time: %s", + file_key, + created_at_formatted_str, + ) + raise DuplicateFileError(f"Duplicate file: {file_key}") + + # If processing exists for supplier_vaccine, raise an exception + if processing_exists: + logger.info( + "%s file queued for processing at time: %s", + file_key, + created_at_formatted_str, + ) + return False + + return True except Exception as error: # pylint: disable = broad-exception-caught - error_message = f"Error adding {file_key} to the audit table" + error_message = error # f"Error adding {file_key} to the audit table" logger.error(error_message) raise UnhandledAuditTableError(error_message) from error - # If a duplicte exists, raise an exception - if duplicate_exists: - logger.error("%s file duplicate added to s3 at the following time: %s", file_key, created_at_formatted_str) - raise DuplicateFileError(f"Duplicate file: {file_key}") + +def get_queued_file_details(queue_name: str): + + table_name = os.environ["AUDIT_TABLE_NAME"] + queue_name_gsi = "queue_name_index" + + queue_response = dynamodb_resource.Table(table_name).query( + IndexName=queue_name_gsi, + KeyConditionExpression=Key("queue_name").eq(queue_name) + & Key("status").eq("Queued"), + ) + if queue_response["Items"]: + file_name, message_id = get_file_name(queue_response) + return file_name, message_id + else: + return None, None + + +def get_file_name(queue_response: dict): + sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"]) + first_record = sorted_item[0] + file_name = first_record.get("filename") + message_id = first_record.get("message_id") + return file_name, message_id diff --git a/filenameprocessor/src/clients.py b/filenameprocessor/src/clients.py index 96893689e..fefbdd736 100644 --- a/filenameprocessor/src/clients.py +++ b/filenameprocessor/src/clients.py @@ -13,6 +13,7 @@ sqs_client = boto3_client("sqs", region_name=REGION_NAME) dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME) firehose_client = boto3_client("firehose", region_name=REGION_NAME) +lambda_client = boto3_client('lambda', region_name=REGION_NAME) dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME) diff --git a/filenameprocessor/src/errors.py b/filenameprocessor/src/errors.py index 2faa63e85..6f33274bf 100644 --- a/filenameprocessor/src/errors.py +++ b/filenameprocessor/src/errors.py @@ -5,6 +5,10 @@ class DuplicateFileError(Exception): """A custom exception for when it is identified that the file is a duplicate.""" +class ProcessingError(Exception): + """A custom exception for when it is identified that supplier_vaccine file is under processing""" + + class UnhandledAuditTableError(Exception): """A custom exception for when an unexpected error occurs whilst adding the file to the audit table.""" diff --git a/filenameprocessor/src/file_name_processor.py b/filenameprocessor/src/file_name_processor.py index 0e058a059..a1c46ce69 100644 --- a/filenameprocessor/src/file_name_processor.py +++ b/filenameprocessor/src/file_name_processor.py @@ -6,12 +6,17 @@ (ODS code has multiple lengths) """ +import os from uuid import uuid4 -from utils_for_filenameprocessor import get_created_at_formatted_string +from utils_for_filenameprocessor import ( + get_created_at_formatted_string, + move_file, + invoke_filename_lambda, +) from file_key_validation import validate_file_key from send_sqs_message import make_and_send_sqs_message from make_and_upload_ack_file import make_and_upload_the_ack_file -from audit_table import add_to_audit_table +from audit_table import upsert_audit_table, get_queued_file_details from clients import logger from elasticcache import upload_to_elasticache from logging_decorator import logging_decorator @@ -25,6 +30,8 @@ UnhandledSqsError, ) +FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME") + # NOTE: logging_decorator is applied to handle_record function, rather than lambda_handler, because # the logging_decorator is for an individual record, whereas the lambda_handle could potentially be handling @@ -38,35 +45,70 @@ def handle_record(record) -> dict: try: bucket_name = record["s3"]["bucket"]["name"] file_key = record["s3"]["object"]["key"] + except Exception as error: # pylint: disable=broad-except logger.error("Error obtaining file_key: %s", error) - return {"statusCode": 500, "message": "Failed to download file key", "error": str(error)} + return { + "statusCode": 500, + "message": "Failed to download file key", + "error": str(error), + } - if "data-sources" in bucket_name: + if "data-sources" in bucket_name and "/" not in file_key: try: - # Get message details + query_type = "create" # Type of operation on the audit db message_id = str(uuid4()) # Assign a unique message_id for the file - created_at_formatted_string = get_created_at_formatted_string(bucket_name, file_key) - - # Process the file - add_to_audit_table(message_id, file_key, created_at_formatted_string) - vaccine_type, supplier = validate_file_key(file_key) - permissions = validate_vaccine_type_permissions(vaccine_type=vaccine_type, supplier=supplier) - make_and_send_sqs_message( - file_key, message_id, permissions, vaccine_type, supplier, created_at_formatted_string - ) - - logger.info("File '%s' successfully processed", file_key) - - # Return details for logs - return { - "statusCode": 200, - "message": "Successfully sent to SQS queue", - "file_key": file_key, - "message_id": message_id, - "vaccine_type": vaccine_type, - "supplier": supplier, - } + if "message_id" in record: + message_id = record["message_id"] + query_type = "update" + # Get message details + if file_key and message_id is not None: + + created_at_formatted_string = get_created_at_formatted_string( + bucket_name, file_key + ) + vaccine_type = "unknown" + supplier = "unknown" + vaccine_type, supplier = validate_file_key(file_key) + permissions = validate_vaccine_type_permissions( + vaccine_type=vaccine_type, supplier=supplier + ) + # Process the file + # TODO rename to add clarity + status = True # Based on the status the file will be forwarded to sqs fifo queue. + status = upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + f"{supplier}_{vaccine_type}", + "Processing", + query_type, + ) + if status: + make_and_send_sqs_message( + file_key, + message_id, + permissions, + vaccine_type, + supplier, + created_at_formatted_string, + ) + + logger.info("File '%s' successfully processed", file_key) + + # Return details for logs + # TODO Update message + return { + "statusCode": 200, + "message": "Successfully sent to SQS queue", + "file_key": file_key, + "message_id": message_id, + "vaccine_type": vaccine_type, + "supplier": supplier, + } + else: + # TODO Update the logger + logger.info("No files are in queue") except ( # pylint: disable=broad-exception-caught VaccineTypePermissionsError, @@ -78,7 +120,16 @@ def handle_record(record) -> dict: Exception, ) as error: logger.error("Error processing file '%s': %s", file_key, str(error)) - + # Process the file if the error is not of type Duplicate since it is already updated in audit table + if not isinstance(error, DuplicateFileError): + upsert_audit_table( + message_id, + file_key, + created_at_formatted_string, + f"{supplier}_{vaccine_type}", + "Processed", + query_type, + ) # Create ack file # (note that error may have occurred before message_id and created_at_formatted_string were generated) message_delivered = False @@ -86,7 +137,17 @@ def handle_record(record) -> dict: message_id = "Message id was not created" if "created_at_formatted_string" not in locals(): created_at_formatted_string = "created_at_time not identified" - make_and_upload_the_ack_file(message_id, file_key, message_delivered, created_at_formatted_string) + make_and_upload_the_ack_file( + message_id, file_key, message_delivered, created_at_formatted_string + ) + destination_key = f"archive/{file_key}" + move_file(bucket_name, file_key, destination_key) + # Following code will get executed in case of duplicate scenario, vaccine permission error, etc + file_key, message_id = get_queued_file_details(f"{supplier}_{vaccine_type}") + if file_key and message_id is not None: + invoke_filename_lambda( + FILE_NAME_PROC_LAMBDA_NAME, bucket_name, file_key, message_id + ) status_code_map = { VaccineTypePermissionsError: 403, @@ -111,7 +172,11 @@ def handle_record(record) -> dict: try: upload_to_elasticache(file_key, bucket_name) logger.info("%s content successfully uploaded to cache", file_key) - return {"statusCode": 200, "message": "File content successfully uploaded to cache", "file_key": file_key} + return { + "statusCode": 200, + "message": "File content successfully uploaded to cache", + "file_key": file_key, + } except Exception as error: # pylint: disable=broad-except logger.error("Error uploading to cache for file '%s': %s", file_key, error) return { @@ -122,7 +187,11 @@ def handle_record(record) -> dict: } else: - logger.error("Unable to process file %s due to unexpected bucket name %s", file_key, bucket_name) + logger.error( + "Unable to process file %s due to unexpected bucket name %s", + file_key, + bucket_name, + ) return { "statusCode": 500, "message": f"Failed to process file due to unexpected bucket name {bucket_name}", @@ -134,7 +203,6 @@ def lambda_handler(event: dict, context) -> None: # pylint: disable=unused-argu """Lambda handler for filenameprocessor lambda. Processes each record in event records.""" logger.info("Filename processor lambda task started") - for record in event["Records"]: handle_record(record) diff --git a/filenameprocessor/src/send_sqs_message.py b/filenameprocessor/src/send_sqs_message.py index 0c03344ce..8496e1d2e 100644 --- a/filenameprocessor/src/send_sqs_message.py +++ b/filenameprocessor/src/send_sqs_message.py @@ -6,7 +6,7 @@ from errors import InvalidSupplierError, UnhandledSqsError -def send_to_supplier_queue(message_body: dict) -> None: +def send_to_supplier_queue(message_body: dict, vaccine_type: str) -> None: """Sends a message to the supplier queue. Raises an exception if the message is not successfully sent.""" # Check the supplier has been identified (this should already have been validated by initial file validation) if not (supplier := message_body["supplier"]): @@ -16,7 +16,8 @@ def send_to_supplier_queue(message_body: dict) -> None: try: queue_url = os.getenv("QUEUE_URL") - sqs_client.send_message(QueueUrl=queue_url, MessageBody=json_dumps(message_body), MessageGroupId=supplier) + sqs_client.send_message(QueueUrl=queue_url, MessageBody=json_dumps(message_body), + MessageGroupId=f"{supplier}_{vaccine_type}") logger.info("Message sent to SQS queue for supplier: %s", supplier) except Exception as error: # pylint: disable=broad-exception-caught error_message = f"An unexpected error occurred whilst sending to SQS: {error}" @@ -37,4 +38,4 @@ def make_and_send_sqs_message( "created_at_formatted_string": created_at_formatted_string, } - send_to_supplier_queue(message_body) + send_to_supplier_queue(message_body, vaccine_type) diff --git a/filenameprocessor/src/utils_for_filenameprocessor.py b/filenameprocessor/src/utils_for_filenameprocessor.py index 128a0522c..e224a6d15 100644 --- a/filenameprocessor/src/utils_for_filenameprocessor.py +++ b/filenameprocessor/src/utils_for_filenameprocessor.py @@ -1,9 +1,9 @@ """Utils for filenameprocessor lambda""" - +import json from csv import DictReader from io import StringIO from constants import Constants -from clients import s3_client +from clients import s3_client, logger, lambda_client def get_created_at_formatted_string(bucket_name: str, file_key: str) -> str: @@ -25,3 +25,37 @@ def identify_supplier(ods_code: str) -> str: Defaults to empty string if ODS code isn't found in the mappings. """ return Constants.ODS_TO_SUPPLIER_MAPPINGS.get(ods_code, "") + + +def move_file(bucket_name: str, source_key: str, destination_key: str) -> None: + + """ Moves a file from one location to another in S3 by copying and then deleting it. Args: + bucket_name (str): Name of the S3 bucket. source_key (str): Source file key. + destination_key (str): Destination file key.""" + s3_client.copy_object( + Bucket=bucket_name, + CopySource={"Bucket": bucket_name, "Key": source_key}, + Key=destination_key + ) + s3_client.delete_object(Bucket=bucket_name, Key=source_key) + logger.info("File moved from %s to %s", source_key, destination_key) + + +def invoke_filename_lambda(file_name_processor_name, source_bucket_name, file_key, message_id): + lambda_payload = {"Records": [ + { + "s3": { + "bucket": { + "name": source_bucket_name + }, + "object": { + "key": file_key + } + }, + "message_id": message_id} + ] + } + lambda_client.invoke( + FunctionName=file_name_processor_name, + InvocationType="Event", + Payload=json.dumps(lambda_payload)) diff --git a/infra/audit_db.tf b/infra/audit_db.tf deleted file mode 100644 index a1548f991..000000000 --- a/infra/audit_db.tf +++ /dev/null @@ -1,29 +0,0 @@ -resource "aws_dynamodb_table" "audit-table" { - name = "immunisation-batch-${local.env}-audit-table" - billing_mode = "PAY_PER_REQUEST" - hash_key = "message_id" - - attribute { - name = "message_id" - type = "S" - } - - attribute { - name = "filename" - type = "S" - } - - global_secondary_index { - name = "filename_index" - hash_key = "filename" - projection_type = "KEYS_ONLY" - } - - point_in_time_recovery { - enabled = local.environment == "prod" ? true : false - } - server_side_encryption { - enabled = true - kms_key_arn = data.aws_kms_key.existing_dynamo_encryption_arn.arn - } -} \ No newline at end of file diff --git a/infra/non-prod/audit_db.tf b/infra/non-prod/audit_db.tf new file mode 100644 index 000000000..0290c15eb --- /dev/null +++ b/infra/non-prod/audit_db.tf @@ -0,0 +1,131 @@ +resource "aws_dynamodb_table" "audit-table-int" { + name = "immunisation-batch-int-audit-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "message_id" + + attribute { + name = "message_id" + type = "S" + } + + attribute { + name = "filename" + type = "S" + } + + attribute { + name = "queue_name" + type = "S" + } + + attribute { + name = "status" + type = "S" + } + + global_secondary_index { + name = "filename_index" + hash_key = "filename" + projection_type = "ALL" + } + + global_secondary_index { + name = "queue_name_index" + hash_key = "queue_name" + range_key = "status" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "audit-table-ref" { + name = "immunisation-batch-ref-audit-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "message_id" + + attribute { + name = "message_id" + type = "S" + } + + attribute { + name = "filename" + type = "S" + } + + attribute { + name = "queue_name" + type = "S" + } + + attribute { + name = "status" + type = "S" + } + + global_secondary_index { + name = "filename_index" + hash_key = "filename" + projection_type = "ALL" + } + + global_secondary_index { + name = "queue_name_index" + hash_key = "queue_name" + range_key = "status" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "audit-table" { + name = "immunisation-batch-internal-dev-audit-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "message_id" + + attribute { + name = "message_id" + type = "S" + } + + attribute { + name = "filename" + type = "S" + } + + attribute { + name = "queue_name" + type = "S" + } + + attribute { + name = "status" + type = "S" + } + + global_secondary_index { + name = "filename_index" + hash_key = "filename" + projection_type = "ALL" + } + + global_secondary_index { + name = "queue_name_index" + hash_key = "queue_name" + range_key = "status" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} \ No newline at end of file diff --git a/infra/non-prod/delta_db.tf b/infra/non-prod/delta_db.tf new file mode 100644 index 000000000..8d337b1ed --- /dev/null +++ b/infra/non-prod/delta_db.tf @@ -0,0 +1,155 @@ +resource "aws_dynamodb_table" "delta-dynamodb-int-table" { + name = "imms-int-delta" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + attribute { + name = "PK" + type = "S" + } + attribute { + name = "DateTimeStamp" + type = "S" + } + attribute { + name = "Operation" + type = "S" + } + + attribute { + name = "VaccineType" + type = "S" + } + + attribute { + name = "SupplierSystem" + type = "S" + } + + ttl { + attribute_name = "ExpiresAt" + enabled = true + } + + global_secondary_index { + name = "SearchIndex" + hash_key = "Operation" + range_key = "DateTimeStamp" + projection_type = "ALL" + } + + global_secondary_index { + name = "SecondarySearchIndex" + hash_key = "SupplierSystem" + range_key = "VaccineType" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "delta-dynamodb-ref-table" { + name = "imms-ref-delta" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + attribute { + name = "PK" + type = "S" + } + attribute { + name = "DateTimeStamp" + type = "S" + } + attribute { + name = "Operation" + type = "S" + } + + attribute { + name = "VaccineType" + type = "S" + } + + attribute { + name = "SupplierSystem" + type = "S" + } + + ttl { + attribute_name = "ExpiresAt" + enabled = true + } + + global_secondary_index { + name = "SearchIndex" + hash_key = "Operation" + range_key = "DateTimeStamp" + projection_type = "ALL" + } + + global_secondary_index { + name = "SecondarySearchIndex" + hash_key = "SupplierSystem" + range_key = "VaccineType" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "delta-dynamodb-table" { + name = "imms-internal-dev-delta" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + attribute { + name = "PK" + type = "S" + } + attribute { + name = "DateTimeStamp" + type = "S" + } + attribute { + name = "Operation" + type = "S" + } + + attribute { + name = "VaccineType" + type = "S" + } + + attribute { + name = "SupplierSystem" + type = "S" + } + + ttl { + attribute_name = "ExpiresAt" + enabled = true + } + + global_secondary_index { + name = "SearchIndex" + hash_key = "Operation" + range_key = "DateTimeStamp" + projection_type = "ALL" + } + + global_secondary_index { + name = "SecondarySearchIndex" + hash_key = "SupplierSystem" + range_key = "VaccineType" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} \ No newline at end of file diff --git a/infra/non-prod/endpoints.tf b/infra/non-prod/endpoints.tf new file mode 100644 index 000000000..ad3773f56 --- /dev/null +++ b/infra/non-prod/endpoints.tf @@ -0,0 +1,264 @@ +resource "aws_security_group" "lambda_redis_sg" { + vpc_id = data.aws_vpc.default.id + name = "immunisation-security-group" + + # Inbound rule to allow traffic only from the VPC CIDR block + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["172.31.0.0/16"] + } + + egress { + cidr_blocks = [] + from_port = 0 + ipv6_cidr_blocks = [] + prefix_list_ids = [ + "pl-b3a742da", + "pl-93a247fa", + "pl-7ca54015", + ] + protocol = "-1" + security_groups = [] + self = true + to_port = 0 + } +} + +resource "aws_vpc_endpoint" "sqs_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.sqs" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow" + Principal = { + AWS ="*" + }, + Action = [ + "sqs:SendMessage", + "sqs:ReceiveMessage", + "kms:Decrypt" + ] + Resource = "*", + } + ] + }) + tags = { + Name = "immunisation-sqs-endpoint" + } +} + +resource "aws_vpc_endpoint" "s3_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.s3" + + route_table_ids = [ + for rt in data.aws_route_tables.default_route_tables.ids : rt + ] + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow" + Principal = { + AWS ="*" + }, + Action = [ + "s3:GetObject", + "s3:PutObject", + "s3:ListBucket", + "s3:DeleteObject", + "s3:CopyObject" + ] + Resource = "*", + } + ] + }) + tags = { + Name = "immunisation-s3-endpoint" + } +} + +resource "aws_vpc_endpoint" "kinesis_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.kinesis-firehose" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Principal = "*", + Action = [ + "firehose:ListDeliveryStreams", + "firehose:PutRecord", + "firehose:PutRecordBatch" + ], + Resource = "*" + } + ] + }) + tags = { + Name = "immunisation-kinesis-endpoint" + } +} + +resource "aws_vpc_endpoint" "dynamodb" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.dynamodb" + + route_table_ids = [ + for rt in data.aws_route_tables.default_route_tables.ids : rt + ] + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "*", + "Resource": "*" + } + ] + }) + tags = { + Name = "immunisation-dynamo-endpoint" + } + + +} + + +resource "aws_vpc_endpoint" "ecr_api" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.ecr.api" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + tags = { + Name = "immunisation-ecr-api-endpoint" + } +} + +resource "aws_vpc_endpoint" "ecr_dkr" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.ecr.dkr" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + tags = { + Name = "immunisation-ecr-dkr-endpoint" + } +} + +resource "aws_vpc_endpoint" "cloud_watch" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.logs" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + tags = { + Name = "immunisation-cloud-watch-endpoint" + } +} + + +resource "aws_vpc_endpoint" "kinesis_stream_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.kinesis-streams" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Principal = { + AWS ="*" + }, + Action = [ + "kinesis:ListShards", + "kinesis:ListStreams", + "kinesis:PutRecord", + "kinesis:PutRecords" + ], + Resource = "*" + } + ] + }) + tags = { + Name = "immunisation-kinesis-streams-endpoint" + } +} + +resource "aws_vpc_endpoint" "kms_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.kms" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Principal = "*", + Action = [ + "kms:Decrypt", + "kms:Encrypt", + "kms:GenerateDataKey*" + ], + Resource = [ + "arn:aws:kms:eu-west-2:345594581768:key/648c8c6f-54bf-4b79-ad72-0be6e8d72423", + "arn:aws:kms:eu-west-2:345594581768:key/9bbfbfd9-1745-4325-a9b7-33d1f6be89c1" + ] + } + ] + }) + tags = { + Name = "immunisation-kms-endpoint" + } +} + + +resource "aws_vpc_endpoint" "lambda_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.lambda" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + tags = { + Name = "immunisation-lambda-endpoint" + } +} \ No newline at end of file diff --git a/infra/non-prod/imms_db.tf b/infra/non-prod/imms_db.tf new file mode 100644 index 000000000..0b2a053ae --- /dev/null +++ b/infra/non-prod/imms_db.tf @@ -0,0 +1,140 @@ +resource "aws_dynamodb_table" "events-dynamodb-int-table" { + name = "imms-int-imms-events" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + stream_enabled = true + stream_view_type = "NEW_IMAGE" + + attribute { + name = "PK" + type = "S" + } + attribute { + name = "PatientPK" + type = "S" + } + attribute { + name = "PatientSK" + type = "S" + } + attribute { + name = "IdentifierPK" + type = "S" + } + + tags = { + NHSE-Enable-Dynamo-Backup = "True" + } + + global_secondary_index { + name = "PatientGSI" + hash_key = "PatientPK" + range_key = "PatientSK" + projection_type = "ALL" + } + + global_secondary_index { + name = "IdentifierGSI" + hash_key = "IdentifierPK" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "events-dynamodb-ref-table" { + name = "imms-ref-imms-events" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + stream_enabled = true + stream_view_type = "NEW_IMAGE" + + attribute { + name = "PK" + type = "S" + } + attribute { + name = "PatientPK" + type = "S" + } + attribute { + name = "PatientSK" + type = "S" + } + attribute { + name = "IdentifierPK" + type = "S" + } + + tags = { + NHSE-Enable-Dynamo-Backup = "True" + } + + global_secondary_index { + name = "PatientGSI" + hash_key = "PatientPK" + range_key = "PatientSK" + projection_type = "ALL" + } + + global_secondary_index { + name = "IdentifierGSI" + hash_key = "IdentifierPK" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} + +resource "aws_dynamodb_table" "events-dynamodb-table" { + name = "imms-internal-dev-imms-events" + billing_mode = "PAY_PER_REQUEST" + hash_key = "PK" + stream_enabled = true + stream_view_type = "NEW_IMAGE" + + attribute { + name = "PK" + type = "S" + } + attribute { + name = "PatientPK" + type = "S" + } + attribute { + name = "PatientSK" + type = "S" + } + attribute { + name = "IdentifierPK" + type = "S" + } + + tags = { + NHSE-Enable-Dynamo-Backup = "True" + } + + global_secondary_index { + name = "PatientGSI" + hash_key = "PatientPK" + range_key = "PatientSK" + projection_type = "ALL" + } + + global_secondary_index { + name = "IdentifierGSI" + hash_key = "IdentifierPK" + projection_type = "ALL" + } + + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } +} \ No newline at end of file diff --git a/infra/kinesis_role.tf b/infra/non-prod/kinesis_role.tf similarity index 82% rename from infra/kinesis_role.tf rename to infra/non-prod/kinesis_role.tf index 77bc9f1ce..5acbc1254 100644 --- a/infra/kinesis_role.tf +++ b/infra/non-prod/kinesis_role.tf @@ -1,7 +1,3 @@ -data "aws_kms_key" "existing_kinesis_encryption_key" { - key_id = "alias/imms-batch-kinesis-stream-encryption" -} - resource "aws_iam_role" "kinesis_role" { name = "kinesis-role" @@ -32,7 +28,7 @@ resource "aws_iam_policy" "kinesis_kms_policy" { "kms:ReEncrypt*", "kms:DescribeKey" ] - Resource = data.aws_kms_key.existing_kinesis_encryption_key.arn + Resource = aws_kms_key.kinesis_stream_encryption.arn }] }) } diff --git a/infra/kms_dynamo.tf b/infra/non-prod/kms_dynamo.tf similarity index 84% rename from infra/kms_dynamo.tf rename to infra/non-prod/kms_dynamo.tf index eaab93046..b0ddb6144 100644 --- a/infra/kms_dynamo.tf +++ b/infra/non-prod/kms_dynamo.tf @@ -49,6 +49,19 @@ "kms:GenerateDataKey*" ], "Resource": "*" + }, + { + "Sid": "AllowAccountA", + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::603871901111:root" + }, + "Action": [ + "kms:Encrypt", + "kms:Decrypt", + "kms:GenerateDataKey*" + ], + "Resource": "*" } ] } @@ -59,8 +72,3 @@ resource "aws_kms_alias" "dynamodb_encryption" { name = "alias/imms-event-dynamodb-encryption" target_key_id = aws_kms_key.dynamodb_encryption.key_id } - -data "aws_kms_key" "existing_dynamo_encryption_arn" { - key_id = "alias/imms-event-dynamodb-encryption" - arn = aws_kms_key.dynamodb_encryption.arn -} \ No newline at end of file diff --git a/infra/kms_kinesis.tf b/infra/non-prod/kms_kinesis.tf similarity index 100% rename from infra/kms_kinesis.tf rename to infra/non-prod/kms_kinesis.tf diff --git a/infra/kms_lambda.tf b/infra/non-prod/kms_lambda.tf similarity index 100% rename from infra/kms_lambda.tf rename to infra/non-prod/kms_lambda.tf diff --git a/infra/kms_s3.tf b/infra/non-prod/kms_s3.tf similarity index 100% rename from infra/kms_s3.tf rename to infra/non-prod/kms_s3.tf diff --git a/infra/main.tf b/infra/non-prod/main.tf similarity index 100% rename from infra/main.tf rename to infra/non-prod/main.tf diff --git a/infra/policies/s3_batch_dest_policy.json b/infra/non-prod/policies/s3_batch_dest_policy.json similarity index 100% rename from infra/policies/s3_batch_dest_policy.json rename to infra/non-prod/policies/s3_batch_dest_policy.json diff --git a/infra/rediscache.tf b/infra/non-prod/rediscache.tf similarity index 100% rename from infra/rediscache.tf rename to infra/non-prod/rediscache.tf diff --git a/infra/non-prod/s3_config.tf b/infra/non-prod/s3_config.tf new file mode 100644 index 000000000..c8cf5513f --- /dev/null +++ b/infra/non-prod/s3_config.tf @@ -0,0 +1,44 @@ +resource "aws_s3_bucket" "batch_config_bucket" { + bucket = "imms-internal-dev-supplier-config" + + tags = { + "NHSE-Enable-S3-Backup" = "True" + } +} + +resource "aws_s3_bucket_public_access_block" "batch_config_bucket_public_access_block" { + bucket = aws_s3_bucket.batch_config_bucket.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_policy" "batch_config_bucket_policy" { + bucket = aws_s3_bucket.batch_config_bucket.id + + policy = jsonencode({ + Version = "2012-10-17" + Id = "batch_config_bucket_policy" + Statement = [ + { + Sid = "HTTPSOnly" + Effect = "Deny" + Principal = { + AWS = "arn:aws:iam::${local.local_account_id}:root" + } + Action = "s3:*" + Resource = [ + aws_s3_bucket.batch_config_bucket.arn, + "${aws_s3_bucket.batch_config_bucket.arn}/*", + ] + Condition = { + Bool = { + "aws:SecureTransport" = "false" + } + } + }, + ] + }) +} diff --git a/infra/non-prod/s3_dest_bucket.tf b/infra/non-prod/s3_dest_bucket.tf new file mode 100644 index 000000000..5283d7876 --- /dev/null +++ b/infra/non-prod/s3_dest_bucket.tf @@ -0,0 +1,111 @@ +locals { + policy_path = "${path.root}/policies" + env = terraform.workspace + is_temp = length(regexall("[a-z]{2,4}-?[0-9]+", local.env)) > 0 + } + +resource "aws_s3_bucket" "batch_data_destination_int_bucket" { + bucket = "immunisation-batch-int-data-destinations" + force_destroy = local.is_temp + tags = { + "Environment" = "int" + "Project" = "immunisation" + "Service" = "fhir-api" + } +} + +data "aws_iam_policy_document" "batch_data_destination_int_bucket_policy" { + source_policy_documents = [ + templatefile("${local.policy_path}/s3_batch_dest_policy.json", { + "bucket-name" : aws_s3_bucket.batch_data_destination_int_bucket.bucket + } ), + ] +} + +resource "aws_s3_bucket_policy" "batch_data_destination_int_bucket_policy" { + bucket = aws_s3_bucket.batch_data_destination_int_bucket.id + policy = data.aws_iam_policy_document.batch_data_destination_int_bucket_policy.json +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_destination_int_encryption" { + bucket = aws_s3_bucket.batch_data_destination_int_bucket.id + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn + sse_algorithm = "aws:kms" + } + } +} + + +resource "aws_s3_bucket" "batch_data_destination_ref_bucket" { + bucket = "immunisation-batch-ref-data-destinations" + force_destroy = local.is_temp + tags = { + "Environment" = "ref" + "Project" = "immunisation" + "Service" = "fhir-api" + } +} + +data "aws_iam_policy_document" "batch_data_destination_ref_bucket_policy" { + source_policy_documents = [ + templatefile("${local.policy_path}/s3_batch_dest_policy.json", { + "bucket-name" : aws_s3_bucket.batch_data_destination_ref_bucket.bucket + } ), + ] +} + +resource "aws_s3_bucket_policy" "batch_data_destination_ref_bucket_policy" { + bucket = aws_s3_bucket.batch_data_destination_ref_bucket.id + policy = data.aws_iam_policy_document.batch_data_destination_ref_bucket_policy.json +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_destination_ref_encryption" { + bucket = aws_s3_bucket.batch_data_destination_ref_bucket.id + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn + sse_algorithm = "aws:kms" + } + } +} + +resource "aws_s3_bucket" "batch_data_destination_bucket" { + bucket = "immunisation-batch-internal-dev-data-destinations" + force_destroy = local.is_temp + tags = { + "Environment" = "internal-dev" + "Project" = "immunisation" + "Service" = "fhir-api" + } +} + +data "aws_iam_policy_document" "batch_data_destination_bucket_policy" { + source_policy_documents = [ + templatefile("${local.policy_path}/s3_batch_dest_policy.json", { + "bucket-name" : aws_s3_bucket.batch_data_destination_bucket.bucket + } ), + ] +} + +resource "aws_s3_bucket_policy" "batch_data_destination_bucket_policy" { + bucket = aws_s3_bucket.batch_data_destination_bucket.id + policy = data.aws_iam_policy_document.batch_data_destination_bucket_policy.json +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_destination_encryption" { + bucket = aws_s3_bucket.batch_data_destination_bucket.id + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn + sse_algorithm = "aws:kms" + } + } +} + + + diff --git a/infra/variables.tf b/infra/non-prod/variables.tf similarity index 100% rename from infra/variables.tf rename to infra/non-prod/variables.tf diff --git a/infra/prod/audit_db.tf b/infra/prod/audit_db.tf new file mode 100644 index 000000000..ab86fd947 --- /dev/null +++ b/infra/prod/audit_db.tf @@ -0,0 +1,52 @@ +resource "aws_dynamodb_table" "audit-table" { + name = "immunisation-batch-prod-audit-table" + billing_mode = "PAY_PER_REQUEST" + hash_key = "message_id" + + attribute { + name = "message_id" + type = "S" + } + + attribute { + name = "filename" + type = "S" + } + + attribute { + name = "queue_name" + type = "S" + } + + attribute { + name = "status" + type = "S" + } + + global_secondary_index { + name = "filename_index" + hash_key = "filename" + projection_type = "ALL" + } + + global_secondary_index { + name = "queue_name_index" + hash_key = "queue_name" + range_key = "status" + projection_type = "ALL" + } + + point_in_time_recovery { + enabled = true + } + server_side_encryption { + enabled = true + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } + + tags = { + "Environment" = "prod" + "Project" = "immunisation" + "Service" = "fhir-api" + } +} \ No newline at end of file diff --git a/infra/delta_db.tf b/infra/prod/delta_db.tf similarity index 81% rename from infra/delta_db.tf rename to infra/prod/delta_db.tf index 7f052bff0..4252b83ab 100644 --- a/infra/delta_db.tf +++ b/infra/prod/delta_db.tf @@ -1,5 +1,5 @@ resource "aws_dynamodb_table" "delta-dynamodb-table" { - name = "imms-${local.env}-delta" + name = "imms-prod-delta" billing_mode = "PAY_PER_REQUEST" hash_key = "PK" attribute { @@ -45,10 +45,16 @@ resource "aws_dynamodb_table" "delta-dynamodb-table" { } point_in_time_recovery { - enabled = local.environment == "prod" ? true : false + enabled = true } server_side_encryption { enabled = true - kms_key_arn = data.aws_kms_key.existing_dynamo_encryption_arn.arn + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } + + tags = { + "Environment" = "prod" + "Project" = "immunisation" + "Service" = "fhir-api" } } \ No newline at end of file diff --git a/infra/endpoints.tf b/infra/prod/endpoints.tf similarity index 70% rename from infra/endpoints.tf rename to infra/prod/endpoints.tf index 762c00581..cd44772b8 100644 --- a/infra/endpoints.tf +++ b/infra/prod/endpoints.tf @@ -47,7 +47,7 @@ resource "aws_vpc_endpoint" "sqs_endpoint" { Effect = "Allow" Principal = { "AWS": [ - "arn:aws:iam::${local.local_account_id}:root" + "*" ] }, Action = [ @@ -55,10 +55,7 @@ resource "aws_vpc_endpoint" "sqs_endpoint" { "sqs:ReceiveMessage", "kms:Decrypt" ] - Resource = ["arn:aws:sqs:${var.aws_region}:${local.local_account_id}:${var.project_short_name}-int-metadata-queue.fifo", - "arn:aws:sqs:${var.aws_region}:${local.local_account_id}:${var.project_short_name}-ref-metadata-queue.fifo", - "arn:aws:sqs:${var.aws_region}:${local.local_account_id}:${var.project_short_name}-internal-dev-metadata-queue.fifo", - "arn:aws:sqs:${var.aws_region}:${local.local_account_id}:${var.project_short_name}-pr-78-metadata-queue.fifo"] + Resource = "*" } ] }) @@ -81,36 +78,16 @@ resource "aws_vpc_endpoint" "s3_endpoint" { { Effect = "Allow" Principal = { - "AWS": [ - "arn:aws:iam::${local.local_account_id}:root" - ] + "AWS": "*" }, Action = [ "s3:GetObject", "s3:PutObject", - "s3:ListBucket" - ] - Resource = [ - "arn:aws:s3:::${var.project_name}-pr-78-data-sources", - "arn:aws:s3:::${var.project_name}-pr-78-data-sources/*", - "arn:aws:s3:::${var.project_name}-int-data-sources", - "arn:aws:s3:::${var.project_name}-int-data-sources/*", - "arn:aws:s3:::${var.project_name}-ref-data-sources", - "arn:aws:s3:::${var.project_name}-ref-data-sources/*", - "arn:aws:s3:::${var.project_name}-internal-dev-data-sources", - "arn:aws:s3:::${var.project_name}-internal-dev-data-sources/*", - "arn:aws:s3:::${var.project_name}-pr-78-data-destinations", - "arn:aws:s3:::${var.project_name}-pr-78-data-destinations/*", - "arn:aws:s3:::${var.project_name}-int-data-destinations", - "arn:aws:s3:::${var.project_name}-int-data-destinations/*", - "arn:aws:s3:::${var.project_name}-ref-data-destinations", - "arn:aws:s3:::${var.project_name}-ref-data-destinations/*", - "arn:aws:s3:::${var.project_name}-internal-dev-data-destinations", - "arn:aws:s3:::${var.project_name}-internal-dev-data-destinations/*", - "arn:aws:s3:::${aws_s3_bucket.batch_config_bucket.bucket}", - "arn:aws:s3:::${aws_s3_bucket.batch_config_bucket.bucket}/*", - "arn:aws:s3:::prod-${var.aws_region}-starport-layer-bucket/*" + "s3:ListBucket", + "s3:CopyObject", + "s3:DeleteObject" ] + Resource = "*" } ] }) @@ -133,11 +110,7 @@ resource "aws_vpc_endpoint" "kinesis_endpoint" { Statement = [ { Effect = "Allow", - Principal = { - "AWS":[ - "arn:aws:iam::${local.local_account_id}:root" - ] - }, + Principal = "*", Action = [ "firehose:ListDeliveryStreams", "firehose:PutRecord", @@ -235,7 +208,7 @@ resource "aws_vpc_endpoint" "kinesis_stream_endpoint" { Effect = "Allow", Principal = { "AWS":[ - "arn:aws:iam::${local.local_account_id}:root" + "*" ] }, Action = [ @@ -252,3 +225,49 @@ resource "aws_vpc_endpoint" "kinesis_stream_endpoint" { Name = "immunisation-kinesis-streams-endpoint" } } + +resource "aws_vpc_endpoint" "kms_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.kms" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Principal = "*", + Action = [ + "kms:Decrypt", + "kms:Encrypt", + "kms:GenerateDataKey*" + ], + Resource = [ + "arn:aws:kms:eu-west-2:664418956997:key/4e643221-4cb8-49c5-9a78-ced991ff52ae", + "arn:aws:kms:eu-west-2:664418956997:key/d7b3c213-3c05-4caf-bb95-fdb2a6e533b1" + ] + } + ] + }) + tags = { + Name = "immunisation-kms-endpoint" + } +} + +resource "aws_vpc_endpoint" "lambda_endpoint" { + vpc_id = data.aws_vpc.default.id + service_name = "com.amazonaws.${var.aws_region}.lambda" + vpc_endpoint_type = "Interface" + + subnet_ids = data.aws_subnets.default.ids + security_group_ids = [aws_security_group.lambda_redis_sg.id] + private_dns_enabled = true + tags = { + Name = "immunisation-lambda-endpoint" + } +} + diff --git a/infra/imms_db.tf b/infra/prod/imms_db.tf similarity index 80% rename from infra/imms_db.tf rename to infra/prod/imms_db.tf index f7ef0e988..68a470ed6 100644 --- a/infra/imms_db.tf +++ b/infra/prod/imms_db.tf @@ -1,5 +1,5 @@ resource "aws_dynamodb_table" "events-dynamodb-table" { - name = "imms-${local.env}-imms-events" + name = "imms-prod-imms-events" billing_mode = "PAY_PER_REQUEST" hash_key = "PK" stream_enabled = true @@ -24,6 +24,9 @@ resource "aws_dynamodb_table" "events-dynamodb-table" { tags = { NHSE-Enable-Dynamo-Backup = "True" + "Environment" = "prod" + "Project" = "immunisation" + "Service" = "fhir-api" } global_secondary_index { @@ -40,10 +43,10 @@ resource "aws_dynamodb_table" "events-dynamodb-table" { } point_in_time_recovery { - enabled = local.environment == "prod" ? true : false + enabled = true } server_side_encryption { enabled = true - kms_key_arn = data.aws_kms_key.existing_dynamo_encryption_arn.arn - } + kms_key_arn = aws_kms_key.dynamodb_encryption.arn + } } \ No newline at end of file diff --git a/infra/prod/kinesis_role.tf b/infra/prod/kinesis_role.tf new file mode 100644 index 000000000..5acbc1254 --- /dev/null +++ b/infra/prod/kinesis_role.tf @@ -0,0 +1,39 @@ +resource "aws_iam_role" "kinesis_role" { + name = "kinesis-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Principal = { + Service = "kinesis.amazonaws.com" + } + Action = "sts:AssumeRole" + }] + }) +} + +resource "aws_iam_policy" "kinesis_kms_policy" { + name = "kinesis-kms-policy" + description = "Allow Kinesis to use the KMS key" + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Effect = "Allow" + Action = [ + "kms:Encrypt", + "kms:Decrypt", + "kms:GenerateDataKey*", + "kms:ReEncrypt*", + "kms:DescribeKey" + ] + Resource = aws_kms_key.kinesis_stream_encryption.arn + }] + }) +} + +resource "aws_iam_role_policy_attachment" "kinesis_role_policy_attachment" { + role = aws_iam_role.kinesis_role.name + policy_arn = aws_iam_policy.kinesis_kms_policy.arn +} \ No newline at end of file diff --git a/infra/prod/kms_dynamo.tf b/infra/prod/kms_dynamo.tf new file mode 100644 index 000000000..7df6fc56e --- /dev/null +++ b/infra/prod/kms_dynamo.tf @@ -0,0 +1,74 @@ + resource "aws_kms_key" "dynamodb_encryption" { + description = "KMS key for DynamoDB encryption" + key_usage = "ENCRYPT_DECRYPT" + enable_key_rotation = true + policy = < 0 } resource "aws_s3_bucket" "batch_data_destination_bucket" { - bucket = "${local.batch_prefix}-data-destinations" + bucket = "immunisation-batch-prod-data-destinations" force_destroy = local.is_temp + tags = { + "Environment" = "prod" + "Project" = "immunisation" + "Service" = "fhir-api" + } } data "aws_iam_policy_document" "batch_data_destination_bucket_policy" { source_policy_documents = [ - local.environment == "prod" ? templatefile("${local.policy_path}/s3_batch_dest_policy_prod.json", { - "bucket-name" : aws_s3_bucket.batch_data_destination_bucket.bucket - } ): templatefile("${local.policy_path}/s3_batch_dest_policy.json", { + templatefile("${local.policy_path}/s3_batch_dest_policy.json", { "bucket-name" : aws_s3_bucket.batch_data_destination_bucket.bucket } ), ] @@ -34,4 +36,34 @@ resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_destinat sse_algorithm = "aws:kms" } } +} + +resource "aws_s3_bucket_lifecycle_configuration" "data_destinations" { + bucket = aws_s3_bucket.batch_data_destination_bucket.id + + rule { + id = "DeleteFilesFromForwardedFile" + status = "Enabled" + + filter { + prefix = "forwardedFile/" + } + + expiration { + days = 14 + } + } + + rule { + id = "DeleteFilesFromAckFolder" + status = "Enabled" + + filter { + prefix = "ack/" + } + + expiration { + days = 14 + } + } } \ No newline at end of file diff --git a/infra/prod/s3_source_bucket.tf b/infra/prod/s3_source_bucket.tf new file mode 100644 index 000000000..25ccfabff --- /dev/null +++ b/infra/prod/s3_source_bucket.tf @@ -0,0 +1,50 @@ +resource "aws_s3_bucket" "batch_data_source_bucket" { + bucket = "immunisation-batch-prod-data-sources" + force_destroy = local.is_temp + tags = { + "Environment" = "prod" + "Project" = "immunisation" + "Service" = "fhir-api" + } +} + +data "aws_iam_policy_document" "batch_data_source_bucket_policy" { + source_policy_documents = [ + templatefile("${local.policy_path}/s3_batch_source_policy.json", { + "bucket-name" : aws_s3_bucket.batch_data_source_bucket.bucket + } ), + ] +} + +resource "aws_s3_bucket_policy" "batch_data_source_bucket_policy" { + bucket = aws_s3_bucket.batch_data_source_bucket.id + policy = data.aws_iam_policy_document.batch_data_source_bucket_policy.json +} + +# resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_source_encryption" { +# bucket = aws_s3_bucket.batch_data_source_bucket.id + +# rule { +# apply_server_side_encryption_by_default { +# kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn +# sse_algorithm = "aws:kms" +# } +# } +# } + +resource "aws_s3_bucket_lifecycle_configuration" "datasources_lifecycle" { + bucket = "immunisation-batch-prod-data-sources" + + rule { + id = "DeleteFilesAfter7Days" + status = "Enabled" + + filter { + prefix = "*" + } + + expiration { + days = 7 + } + } +} \ No newline at end of file diff --git a/infra/prod/variables.tf b/infra/prod/variables.tf new file mode 100644 index 000000000..6bc67f010 --- /dev/null +++ b/infra/prod/variables.tf @@ -0,0 +1,30 @@ +data "aws_vpc" "default" { + default = true +} +data "aws_subnets" "default" { + filter { + name = "vpc-id" + values = [data.aws_vpc.default.id] + } +} +data "aws_route_tables" "default_route_tables" { + vpc_id = data.aws_vpc.default.id +} + +variable "aws_region" { + default = "eu-west-2" +} +variable "project_name" { + default = "immunisation-batch" +} +variable "project_short_name" { + default = "imms-batch" +} +locals { + environment = terraform.workspace + account_id = local.environment == "prod" ? 232116723729 : 603871901111 + local_account_id = local.environment == "prod" ? 664418956997 : 345594581768 +} +data "aws_kms_key" "existing_s3_encryption_key" { + key_id = "alias/imms-batch-s3-shared-key" +} \ No newline at end of file diff --git a/manifest_template.yml b/manifest_template.yml index b3e6cc566..748c02e0b 100644 --- a/manifest_template.yml +++ b/manifest_template.yml @@ -91,12 +91,12 @@ APIGEE_ENVIRONMENTS: immunisation-fhir-api-prod: quota: enabled: true - limit: 300 + limit: 1200 interval: 1 timeunit: minute spikeArrest: enabled: true - ratelimit: 600pm # 10 requests per second + ratelimit: 1200pm # 20 requests per second app: quota: enabled: false diff --git a/recordprocessor/src/audit_table.py b/recordprocessor/src/audit_table.py new file mode 100644 index 000000000..5c34530b5 --- /dev/null +++ b/recordprocessor/src/audit_table.py @@ -0,0 +1,76 @@ +"""Add the filename to the audit table and check for duplicates.""" + +import os +from typing import Union +from boto3.dynamodb.conditions import Key +from clients import dynamodb_client, dynamodb_resource, logger +from errors import UnhandledAuditTableError + + +# TODO update the function name in filename, ack lambda and ecs task +def update_audit_table_status(file_key: str) -> None: + """ + Update the status in the audit table. + """ + try: + table_name = os.environ["AUDIT_TABLE_NAME"] + file_name_gsi = "filename_index" + file_name_response = dynamodb_resource.Table(table_name).query( + IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key) + ) + items = file_name_response.get("Items", []) + message_id = items[0].get("message_id") + queue_name = items[0].get("queue_name") + # Add to the audit table + dynamodb_client.update_item( + TableName=table_name, + Key={"message_id": {"S": message_id}}, + UpdateExpression="SET #status = :status", + ExpressionAttributeNames={"#status": "status"}, + ExpressionAttributeValues={":status": {"S": "Processed"}}, + ConditionExpression="attribute_exists(message_id)", + ) + logger.info( + "%s file, with message id %s, and the status successfully updated to audit table", + file_key, + message_id, + ) + return queue_name + except Exception as error: # pylint: disable = broad-exception-caught + error_message = error # f"Error adding {file_key} to the audit table" + logger.error(error_message) + raise UnhandledAuditTableError(error_message) from error + + +def get_queued_file_details( + queue_name: str, +) -> tuple[Union[None, str], Union[None, str]]: + """ + Check for queued files which return none or oldest file queued for processing. + Returns a tuple in the format (file_name, message_id) for the oldest file. + Defaults to (none,none) if no file found in queued status + """ + table_name = os.environ["AUDIT_TABLE_NAME"] + queue_name_gsi = "queue_name_index" + + queue_response = dynamodb_resource.Table(table_name).query( + IndexName=queue_name_gsi, + KeyConditionExpression=Key("queue_name").eq(queue_name) + & Key("status").eq("Queued"), + ) + if queue_response["Items"]: + file_name, message_id = get_file_name(queue_response) + return file_name, message_id + else: + return None, None + + +def get_file_name(queue_response: dict) -> tuple[str, str]: + """ + Returns (file_name, message_id) for the oldest file. + """ + sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"]) + first_record = sorted_item[0] + file_name = first_record.get("filename") + message_id = first_record.get("message_id") + return file_name, message_id diff --git a/recordprocessor/src/batch_processing.py b/recordprocessor/src/batch_processing.py index 7c25ccff2..400b674ca 100644 --- a/recordprocessor/src/batch_processing.py +++ b/recordprocessor/src/batch_processing.py @@ -15,8 +15,6 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None: For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis, and documents the outcome for each row in the ack file. """ - logger.info("Event: %s", incoming_message_body) - try: interim_message_body = file_level_validation(incoming_message_body=incoming_message_body) except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught @@ -50,7 +48,7 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None: **details_from_processing, } - send_to_kinesis(supplier, outgoing_message_body) + send_to_kinesis(supplier, outgoing_message_body, vaccine.value) logger.info("Total rows processed: %s", row_count) diff --git a/recordprocessor/src/clients.py b/recordprocessor/src/clients.py index 762d5807c..7d4fb868e 100644 --- a/recordprocessor/src/clients.py +++ b/recordprocessor/src/clients.py @@ -1,7 +1,7 @@ """Initialise s3 and kinesis clients""" import logging -from boto3 import client as boto3_client +from boto3 import client as boto3_client, resource as boto3_resource REGION_NAME = "eu-west-2" @@ -9,6 +9,9 @@ kinesis_client = boto3_client("kinesis", region_name=REGION_NAME) sqs_client = boto3_client("sqs", region_name=REGION_NAME) firehose_client = boto3_client("firehose", region_name=REGION_NAME) +dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME) +dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME) +lambda_client = boto3_client('lambda', region_name=REGION_NAME) # Logger diff --git a/recordprocessor/src/errors.py b/recordprocessor/src/errors.py index bde724576..05575f8a5 100644 --- a/recordprocessor/src/errors.py +++ b/recordprocessor/src/errors.py @@ -7,3 +7,7 @@ class NoOperationPermissions(Exception): class InvalidHeaders(Exception): """A custom exception for when the file headers are invalid.""" + + +class UnhandledAuditTableError(Exception): + """A custom exception for when an unexpected error occurs whilst adding the file to the audit table.""" diff --git a/recordprocessor/src/file_level_validation.py b/recordprocessor/src/file_level_validation.py index 61019cc30..fea13f809 100644 --- a/recordprocessor/src/file_level_validation.py +++ b/recordprocessor/src/file_level_validation.py @@ -3,14 +3,23 @@ (validating headers and ensuring that the supplier has permission to perform at least one of the requested operations) """ +import os from constants import Constants from unique_permission import get_unique_action_flags_from_s3 -from clients import logger +from clients import logger, s3_client from make_and_upload_ack_file import make_and_upload_ack_file from mappings import Vaccine -from utils_for_recordprocessor import get_csv_content_dict_reader +from utils_for_recordprocessor import ( + get_csv_content_dict_reader, + invoke_filename_lambda, +) from errors import InvalidHeaders, NoOperationPermissions from logging_decorator import file_level_validation_logging_decorator +from audit_table import update_audit_table_status, get_queued_file_details + +# TODO Move to constants +SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME") +FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME") def validate_content_headers(csv_content_reader) -> None: @@ -36,12 +45,15 @@ def validate_action_flag_permissions( # Convert action flags into the expected operation names requested_permissions_set = { - f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested + f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" + for action in operations_requested } # Check if any of the CSV permissions match the allowed permissions if not requested_permissions_set.intersection(allowed_permissions_list): - raise NoOperationPermissions(f"{supplier} does not have permissions to perform any of the requested actions.") + raise NoOperationPermissions( + f"{supplier} does not have permissions to perform any of the requested actions." + ) logger.info( "%s permissions %s match one of the requested permissions required to %s", @@ -49,22 +61,31 @@ def validate_action_flag_permissions( allowed_permissions_list, requested_permissions_set, ) - return {perm.split("_")[1].upper() for perm in allowed_permissions_list if perm.startswith(vaccine_type)} + return { + perm.split("_")[1].upper() + for perm in allowed_permissions_list + if perm.startswith(vaccine_type) + } @file_level_validation_logging_decorator def file_level_validation(incoming_message_body: dict) -> None: """Validates that the csv headers are correct and that the supplier has permission to perform at least one of - the requested operations. Returns an interim message body for row level processing.""" + the requested operations. Returns an interim message body for row level processing. + """ try: message_id = incoming_message_body.get("message_id") vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum - vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper() + vaccine + for vaccine in Vaccine + if vaccine.value == incoming_message_body.get("vaccine_type").upper() ) supplier = incoming_message_body.get("supplier").upper() file_key = incoming_message_body.get("filename") permission = incoming_message_body.get("permission") - created_at_formatted_string = incoming_message_body.get("created_at_formatted_string") + created_at_formatted_string = incoming_message_body.get( + "created_at_formatted_string" + ) # Fetch the data csv_reader, csv_data = get_csv_content_dict_reader(file_key) @@ -73,14 +94,21 @@ def file_level_validation(incoming_message_body: dict) -> None: validate_content_headers(csv_reader) # Validate has permission to perform at least one of the requested actions - allowed_operations_set = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data) + allowed_operations_set = validate_action_flag_permissions( + supplier, vaccine.value, permission, csv_data + ) except (InvalidHeaders, NoOperationPermissions): - make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string) + make_and_upload_ack_file( + message_id, file_key, False, False, created_at_formatted_string + ) raise # Initialise the accumulated_ack_file_content with the headers - make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string) - + make_and_upload_ack_file( + message_id, file_key, True, True, created_at_formatted_string + ) + destination_key = f"processing/{file_key}" + move_file(SOURCE_BUCKET_NAME, file_key, destination_key) return { "message_id": message_id, "vaccine": vaccine, @@ -95,6 +123,33 @@ def file_level_validation(incoming_message_body: dict) -> None: # NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned message_id = message_id or "Unable to ascertain message_id" file_key = file_key or "Unable to ascertain file_key" - created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string" - make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string) + created_at_formatted_string = ( + created_at_formatted_string + or "Unable to ascertain created_at_formatted_string" + ) + make_and_upload_ack_file( + message_id, file_key, False, False, created_at_formatted_string + ) + destination_key = f"archive/{file_key}" + move_file(SOURCE_BUCKET_NAME, file_key, destination_key) + # Following code excutes on failure to update audit. + queue_name = update_audit_table_status(file_key) + file_key, message_id = get_queued_file_details(queue_name) + if file_key and message_id is not None: + invoke_filename_lambda( + FILE_NAME_PROC_LAMBDA_NAME, SOURCE_BUCKET_NAME, file_key, message_id + ) raise + + +def move_file(bucket_name: str, source_key: str, destination_key: str) -> None: + """Moves a file from one location to another in S3 by copying and then deleting it. Args: + bucket_name (str): Name of the S3 bucket. source_key (str): Source file key. + destination_key (str): Destination file key.""" + s3_client.copy_object( + Bucket=bucket_name, + CopySource={"Bucket": bucket_name, "Key": source_key}, + Key=destination_key, + ) + s3_client.delete_object(Bucket=bucket_name, Key=source_key) + logger.info("File moved from %s to %s", source_key, destination_key) diff --git a/recordprocessor/src/send_to_kinesis.py b/recordprocessor/src/send_to_kinesis.py index f3655a7fc..fe4f820f8 100644 --- a/recordprocessor/src/send_to_kinesis.py +++ b/recordprocessor/src/send_to_kinesis.py @@ -6,7 +6,7 @@ from clients import kinesis_client, logger -def send_to_kinesis(supplier: str, message_body: dict) -> bool: +def send_to_kinesis(supplier: str, message_body: dict, vaccine_type: str) -> bool: """Send a message to the specified Kinesis stream. Returns a boolean indicating whether the send was successful.""" stream_name = os.getenv("KINESIS_STREAM_NAME") try: @@ -14,7 +14,7 @@ def send_to_kinesis(supplier: str, message_body: dict) -> bool: StreamName=stream_name, StreamARN=os.getenv("KINESIS_STREAM_ARN"), Data=json.dumps(message_body, ensure_ascii=False), - PartitionKey=supplier, + PartitionKey=f"{supplier}_{vaccine_type}", ) logger.info("Message sent to Kinesis stream: %s for supplier: %s with resp: %s", stream_name, supplier, resp) return True diff --git a/recordprocessor/src/unique_permission.py b/recordprocessor/src/unique_permission.py index 6b7637b4d..9c0c894ce 100644 --- a/recordprocessor/src/unique_permission.py +++ b/recordprocessor/src/unique_permission.py @@ -10,5 +10,4 @@ def get_unique_action_flags_from_s3(csv_data): df = pd.read_csv(StringIO(csv_data), delimiter='|', usecols=["ACTION_FLAG"]) # Get unique ACTION_FLAG values in one step unique_action_flags = set(df["ACTION_FLAG"].str.upper().unique()) - print(f"unique_action_flags:{unique_action_flags}") return unique_action_flags diff --git a/recordprocessor/src/utils_for_recordprocessor.py b/recordprocessor/src/utils_for_recordprocessor.py index dcdebac34..7c8c8a25f 100644 --- a/recordprocessor/src/utils_for_recordprocessor.py +++ b/recordprocessor/src/utils_for_recordprocessor.py @@ -1,9 +1,10 @@ """Utils for filenameprocessor lambda""" import os +import json from csv import DictReader from io import StringIO -from clients import s3_client +from clients import s3_client, lambda_client, logger def get_environment() -> str: @@ -23,3 +24,26 @@ def get_csv_content_dict_reader(file_key: str) -> DictReader: def create_diagnostics_dictionary(error_type, status_code, error_message) -> dict: """Returns a dictionary containing the error_type, statusCode, and error_message""" return {"error_type": error_type, "statusCode": status_code, "error_message": error_message} + + +def invoke_filename_lambda(file_name_processor, source_bucket_name, file_key, message_id): + try: + lambda_payload = {"Records": [ + { + "s3": { + "bucket": { + "name": source_bucket_name + }, + "object": { + "key": file_key + } + }, + "message_id": message_id} + ] + } + lambda_client.invoke( + FunctionName=file_name_processor, + InvocationType="Event", + Payload=json.dumps(lambda_payload)) + except Exception as error: + logger.info("%s error", error) diff --git a/terraform/ack_lambda.tf b/terraform/ack_lambda.tf index ed16e3d6d..92fa48ba9 100644 --- a/terraform/ack_lambda.tf +++ b/terraform/ack_lambda.tf @@ -110,13 +110,35 @@ resource "aws_iam_policy" "ack_lambda_exec_policy" { Action = [ "s3:GetObject", "s3:PutObject", - "s3:ListBucket" + "s3:ListBucket", + "s3:CopyObject", + "s3:DeleteObject" ] Resource = [ + "arn:aws:s3:::immunisation-batch-${local.env}-data-sources", + "arn:aws:s3:::immunisation-batch-${local.env}-data-sources/*", "${data.aws_s3_bucket.existing_destination_bucket.arn}", "${data.aws_s3_bucket.existing_destination_bucket.arn}/*" ] }, + { + Effect = "Allow" + Action = "lambda:InvokeFunction" + Resource = [ + data.aws_lambda_function.existing_file_name_proc_lambda.arn, + ] + }, + { + Effect = "Allow" + Action = [ + "dynamodb:Query", + "dynamodb:UpdateItem" + ] + Resource = [ + "arn:aws:dynamodb:${var.aws_region}:${local.local_account_id}:table/${data.aws_dynamodb_table.audit-table.name}", + "arn:aws:dynamodb:${var.aws_region}:${local.local_account_id}:table/${data.aws_dynamodb_table.audit-table.name}/index/*", + ] + }, { Effect = "Allow", Action = [ @@ -137,6 +159,10 @@ resource "aws_iam_policy" "ack_lambda_exec_policy" { }) } +resource "aws_cloudwatch_log_group" "ack_lambda_log_group" { + name = "/aws/lambda/${local.short_prefix}-ack-lambda" + retention_in_days = 30 +} resource "aws_iam_policy" "ack_s3_kms_access_policy" { name = "${local.short_prefix}-ack-s3-kms-policy" description = "Allow Lambda to decrypt environment variables" @@ -151,7 +177,9 @@ resource "aws_iam_policy" "ack_s3_kms_access_policy" { "kms:Decrypt", "kms:GenerateDataKey*" ] - Resource = data.aws_kms_key.existing_s3_encryption_key.arn + Resource = [data.aws_kms_key.existing_s3_encryption_key.arn, + data.aws_kms_key.existing_dynamo_encryption_key.arn + ] } ] }) @@ -185,10 +213,16 @@ resource "aws_lambda_function" "ack_processor_lambda" { variables = { ACK_BUCKET_NAME = data.aws_s3_bucket.existing_destination_bucket.bucket SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name + ENVIRONMENT = terraform.workspace + AUDIT_TABLE_NAME = "${data.aws_dynamodb_table.audit-table.name}" + FILE_NAME_PROC_LAMBDA_NAME = data.aws_lambda_function.existing_file_name_proc_lambda.function_name } } reserved_concurrent_executions = 20 + depends_on = [ + aws_cloudwatch_log_group.ack_lambda_log_group + ] } resource "aws_lambda_event_source_mapping" "sqs_to_lambda"{ diff --git a/terraform/api_gateway/api.tf b/terraform/api_gateway/api.tf index e8fe388e1..d949d4f3e 100644 --- a/terraform/api_gateway/api.tf +++ b/terraform/api_gateway/api.tf @@ -18,8 +18,8 @@ resource "aws_apigatewayv2_stage" "default" { default_route_settings { logging_level = "INFO" - throttling_burst_limit = 100 - throttling_rate_limit = 100 + throttling_burst_limit = 500 + throttling_rate_limit = 500 detailed_metrics_enabled = true } access_log_settings { diff --git a/terraform/api_gateway/logs.tf b/terraform/api_gateway/logs.tf index 925a7f4e9..700ffb3f7 100644 --- a/terraform/api_gateway/logs.tf +++ b/terraform/api_gateway/logs.tf @@ -1,6 +1,6 @@ resource "aws_cloudwatch_log_group" "api_access_log" { name = "/aws/vendedlogs/${aws_apigatewayv2_api.service_api.id}/${local.api_stage_name}" - retention_in_days = 7 + retention_in_days = 30 } resource "aws_api_gateway_account" "api_account" { diff --git a/terraform/delta.tf b/terraform/delta.tf index 8075526a1..cb62742ce 100644 --- a/terraform/delta.tf +++ b/terraform/delta.tf @@ -16,7 +16,7 @@ resource "aws_ecr_repository" "delta_lambda_repository" { module "delta_docker_image" { source = "terraform-aws-modules/lambda/aws//modules/docker-build" - + create_ecr_repo = false ecr_repo = "${local.prefix}-delta-lambda-repo" ecr_repo_lifecycle_policy = jsonencode({ @@ -42,6 +42,7 @@ module "delta_docker_image" { triggers = { dir_sha = local.delta_dir_sha } + } # Define the lambdaECRImageRetreival policy @@ -140,6 +141,10 @@ resource "aws_lambda_function" "delta_sync_lambda" { SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name } } + + depends_on = [ + aws_cloudwatch_log_group.delta_lambda + ] } @@ -163,4 +168,9 @@ resource "aws_sqs_queue" "dlq" { resource "aws_sns_topic" "delta_sns" { name = "${local.short_prefix}-${local.sns_name}" +} + +resource "aws_cloudwatch_log_group" "delta_lambda" { + name = "/aws/lambda/${local.short_prefix}-${local.function_name}" + retention_in_days = 30 } \ No newline at end of file diff --git a/terraform/ecs_batch_processor_config.tf b/terraform/ecs_batch_processor_config.tf index 0a4318558..c3be1d9ff 100644 --- a/terraform/ecs_batch_processor_config.tf +++ b/terraform/ecs_batch_processor_config.tf @@ -98,7 +98,9 @@ resource "aws_iam_policy" "ecs_task_exec_policy" { Action = [ "s3:GetObject", "s3:ListBucket", - "s3:PutObject" + "s3:PutObject", + "s3:CopyObject", + "s3:DeleteObject" ], Resource = [ "arn:aws:s3:::${local.batch_prefix}-data-sources", @@ -107,6 +109,17 @@ resource "aws_iam_policy" "ecs_task_exec_policy" { "${data.aws_s3_bucket.existing_destination_bucket.arn}/*" ] }, + { + Effect = "Allow" + Action = [ + "dynamodb:Query", + "dynamodb:UpdateItem" + ] + Resource = [ + "arn:aws:dynamodb:${var.aws_region}:${local.local_account_id}:table/${data.aws_dynamodb_table.audit-table.name}", + "arn:aws:dynamodb:${var.aws_region}:${local.local_account_id}:table/${data.aws_dynamodb_table.audit-table.name}/index/*", + ] + }, { Effect = "Allow" Action = [ @@ -116,7 +129,8 @@ resource "aws_iam_policy" "ecs_task_exec_policy" { ] Resource = [ data.aws_kms_key.existing_s3_encryption_key.arn, - data.aws_kms_key.existing_kinesis_encryption_key.arn + data.aws_kms_key.existing_kinesis_encryption_key.arn, + data.aws_kms_key.existing_dynamo_encryption_key.arn ] }, { @@ -134,6 +148,13 @@ resource "aws_iam_policy" "ecs_task_exec_policy" { ], Resource = "arn:aws:ecr:${var.aws_region}:${local.local_account_id}:repository/${local.short_prefix}-processing-repo" }, + { + Effect = "Allow" + Action = "lambda:InvokeFunction" + Resource = [ + "${data.aws_lambda_function.existing_file_name_proc_lambda.arn}" + ] + }, { "Effect" : "Allow", "Action" : [ @@ -153,6 +174,7 @@ resource "aws_iam_role_policy_attachment" "ecs_task_exec_policy_attachment" { resource "aws_cloudwatch_log_group" "ecs_task_log_group" { name = "/aws/vendedlogs/ecs/${local.short_prefix}-processor-task" + retention_in_days = 30 } # Create the ECS Task Definition @@ -193,6 +215,14 @@ resource "aws_ecs_task_definition" "ecs_task" { { name = "SPLUNK_FIREHOSE_NAME" value = module.splunk.firehose_stream_name + }, + { + name = "AUDIT_TABLE_NAME" + value = "${data.aws_dynamodb_table.audit-table.name}" + }, + { + name = "FILE_NAME_PROC_LAMBDA_NAME" + value = "${data.aws_lambda_function.existing_file_name_proc_lambda.function_name}" } ] logConfiguration = { @@ -326,4 +356,5 @@ resource "aws_pipes_pipe" "fifo_pipe" { # Custom Log Group resource "aws_cloudwatch_log_group" "pipe_log_group" { name = "/aws/vendedlogs/pipes/${local.short_prefix}-pipe-logs" + retention_in_days = 30 } diff --git a/terraform/file_name_processor.tf b/terraform/file_name_processor.tf index afede93bf..e7d1c4fdb 100644 --- a/terraform/file_name_processor.tf +++ b/terraform/file_name_processor.tf @@ -109,7 +109,10 @@ resource "aws_iam_policy" "filenameprocessor_lambda_exec_policy" { Effect = "Allow" Action = [ "s3:GetObject", - "s3:ListBucket" + "s3:ListBucket", + "s3:PutObject", + "s3:CopyObject", + "s3:DeleteObject" ] Resource = [ "arn:aws:s3:::${local.batch_prefix}-data-sources", @@ -156,6 +159,13 @@ resource "aws_iam_policy" "filenameprocessor_lambda_exec_policy" { "firehose:PutRecordBatch" ], "Resource": "arn:aws:firehose:*:*:deliverystream/${module.splunk.firehose_stream_name}" + }, + { + Effect = "Allow" + Action = "lambda:InvokeFunction" + Resource = [ + "arn:aws:lambda:${var.aws_region}:${local.local_account_id}:function:imms-${local.env}-filenameproc_lambda", + ] } ] }) @@ -279,10 +289,17 @@ resource "aws_lambda_function" "file_processor_lambda" { SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name AUDIT_TABLE_NAME = "${data.aws_dynamodb_table.audit-table.name}" FILE_NAME_GSI = "filename_index" + FILE_NAME_PROC_LAMBDA_NAME = "imms-${local.env}-filenameproc_lambda" + } } kms_key_arn = data.aws_kms_key.existing_lambda_encryption_key.arn reserved_concurrent_executions = 20 + depends_on = [ + aws_cloudwatch_log_group.file_name_processor_log_group, + aws_iam_policy.filenameprocessor_lambda_exec_policy + ] + } @@ -302,6 +319,7 @@ resource "aws_s3_bucket_notification" "datasources_lambda_notification" { lambda_function { lambda_function_arn = aws_lambda_function.file_processor_lambda.arn events = ["s3:ObjectCreated:*"] + #filter_prefix ="" } } @@ -398,4 +416,9 @@ resource "aws_iam_policy" "elasticache_permissions" { resource "aws_iam_role_policy_attachment" "elasticache_policy_attachment" { role = aws_iam_role.elasticache_exec_role.name policy_arn = aws_iam_policy.elasticache_permissions.arn +} + +resource "aws_cloudwatch_log_group" "file_name_processor_log_group" { + name = "/aws/lambda/${local.short_prefix}-filenameproc_lambda" + retention_in_days = 30 } \ No newline at end of file diff --git a/terraform/forwarder_lambda.tf b/terraform/forwarder_lambda.tf index 93e936c51..723b58ea2 100644 --- a/terraform/forwarder_lambda.tf +++ b/terraform/forwarder_lambda.tf @@ -212,8 +212,10 @@ resource "aws_lambda_function" "forwarding_lambda" { } kms_key_arn = data.aws_kms_key.existing_lambda_encryption_key.arn depends_on = [ - aws_iam_role_policy_attachment.forwarding_lambda_exec_policy_attachment + aws_iam_role_policy_attachment.forwarding_lambda_exec_policy_attachment, + aws_cloudwatch_log_group.forwarding_lambda_log_group ] + reserved_concurrent_executions = 20 } @@ -226,4 +228,8 @@ resource "aws_lambda_function" "forwarding_lambda" { depends_on = [aws_lambda_function.forwarding_lambda] } - \ No newline at end of file + + resource "aws_cloudwatch_log_group" "forwarding_lambda_log_group" { + name = "/aws/lambda/${local.short_prefix}-forwarding_lambda" + retention_in_days = 30 +} \ No newline at end of file diff --git a/terraform/lambda/lambda.tf b/terraform/lambda/lambda.tf index eb73613ea..23e9b944b 100644 --- a/terraform/lambda/lambda.tf +++ b/terraform/lambda/lambda.tf @@ -5,7 +5,7 @@ module "lambda_function_container_image" { lambda_role = aws_iam_role.lambda_role.arn function_name = "${var.short_prefix}_${var.function_name}" handler = "${var.function_name}_handler.${var.function_name}_handler" - + cloudwatch_logs_retention_in_days = 30 create_package = false image_uri = var.image_uri package_type = "Image" @@ -46,3 +46,5 @@ resource "aws_cloudwatch_log_metric_filter" "max_memory_used_metric" { value = "$18" } } + + diff --git a/terraform/s3_config.tf b/terraform/s3_config.tf index 1a324e342..591dcdbc9 100644 --- a/terraform/s3_config.tf +++ b/terraform/s3_config.tf @@ -19,20 +19,37 @@ resource "aws_s3_bucket_policy" "batch_data_source_bucket_policy" { policy = data.aws_iam_policy_document.batch_data_source_bucket_policy.json } -resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_source_encryption" { - bucket = aws_s3_bucket.batch_data_source_bucket.id +# resource "aws_s3_bucket_server_side_encryption_configuration" "s3_batch_source_encryption" { +# bucket = aws_s3_bucket.batch_data_source_bucket.id - rule { - apply_server_side_encryption_by_default { - kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn - sse_algorithm = "aws:kms" - } - } -} +# rule { +# apply_server_side_encryption_by_default { +# kms_master_key_id = data.aws_kms_key.existing_s3_encryption_key.arn +# sse_algorithm = "aws:kms" +# } +# } +# } resource "aws_s3_bucket_versioning" "source_versioning" { bucket = aws_s3_bucket.batch_data_source_bucket.id versioning_configuration { status = "Enabled" } +} + +resource "aws_s3_bucket_lifecycle_configuration" "datasources_lifecycle" { + bucket = "${local.batch_prefix}-data-sources" + + rule { + id = "DeleteFinalFilesAfter7Days" + status = "Enabled" + + filter { + prefix = "archive/" + } + + expiration { + days = 7 + } + } } \ No newline at end of file diff --git a/terraform/splunk/firehose.tf b/terraform/splunk/firehose.tf index 457fb0d5d..c312679dd 100644 --- a/terraform/splunk/firehose.tf +++ b/terraform/splunk/firehose.tf @@ -27,6 +27,7 @@ resource "aws_kinesis_firehose_delivery_stream" "splunk_firehose_stream" { resource "aws_cloudwatch_log_group" "fire_house_logs" { name = "${local.prefix}-firehose-logs" + retention_in_days = 30 } resource "aws_cloudwatch_log_stream" "splunk_logs_stream" { diff --git a/terraform/variables.tf b/terraform/variables.tf index b52c1b9db..384cb6f66 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -79,6 +79,10 @@ data "aws_s3_bucket" "existing_destination_bucket" { bucket = "immunisation-batch-${local.local_config}-data-destinations" } +data "aws_s3_bucket" "existing_source_bucket" { + bucket = "immunisation-batch-${local.local_config}-data-sources" +} + data "aws_kms_key" "existing_lambda_encryption_key" { key_id = "alias/imms-batch-lambda-env-encryption" } @@ -99,3 +103,7 @@ data "aws_dynamodb_table" "delta-dynamodb-table" { name = "imms-${local.local_config}-delta" } +data "aws_lambda_function" "existing_file_name_proc_lambda" { + function_name = aws_lambda_function.file_processor_lambda.function_name +} + diff --git a/terraform_aws_backup/aws-backup-destination/main.tf b/terraform_aws_backup/aws-backup-destination/main.tf index e0cb87245..2ea9bf17c 100644 --- a/terraform_aws_backup/aws-backup-destination/main.tf +++ b/terraform_aws_backup/aws-backup-destination/main.tf @@ -30,11 +30,11 @@ data "aws_region" "current" {} data "aws_caller_identity" "current" {} data "aws_ssm_parameter" "src_acct_id" { - name = "/imms/awsbackup/dev/sourceacctid" + name = "/imms/awsbackup/sourceacctid" } data "aws_ssm_parameter" "src_acct_name" { - name = "/imms/awsbackup/dev/sourceacctname" + name = "/imms/awsbackup/sourceacctname" } module "destination" { @@ -46,7 +46,7 @@ module "destination" { } locals { - environment = terraform.workspace + environment = "prod" destination_account_id = data.aws_caller_identity.current.account_id } diff --git a/terraform_aws_backup/aws-backup-destination/modules/aws_config/backup_vault_policy.tf b/terraform_aws_backup/aws-backup-destination/modules/aws_config/backup_vault_policy.tf index a408fdd18..9047f6bc8 100644 --- a/terraform_aws_backup/aws-backup-destination/modules/aws_config/backup_vault_policy.tf +++ b/terraform_aws_backup/aws-backup-destination/modules/aws_config/backup_vault_policy.tf @@ -20,26 +20,26 @@ data "aws_iam_policy_document" "vault_policy" { resources = ["*"] } - # dynamic "statement" { - # for_each = var.enable_vault_protection ? [1] : [] - # content { - # sid = "DenyBackupVaultAccess" - # effect = "Deny" + dynamic "statement" { + for_each = var.enable_vault_protection ? [1] : [] + content { + sid = "DenyBackupVaultAccess" + effect = "Deny" - # principals { - # type = "AWS" - # identifiers = ["*"] - # } - # actions = [ - # "backup:DeleteRecoveryPoint", - # "backup:UpdateRecoveryPointLifecycle", - # "backup:DeleteBackupVault", - # "backup:StartRestoreJob", - # "backup:DeleteBackupVaultLockConfiguration", - # ] - # resources = ["*"] - # } - # } + principals { + type = "AWS" + identifiers = ["*"] + } + actions = [ + "backup:DeleteRecoveryPoint", + "backup:UpdateRecoveryPointLifecycle", + "backup:DeleteBackupVault", + "backup:StartRestoreJob", + "backup:DeleteBackupVaultLockConfiguration", + ] + resources = ["*"] + } + } dynamic "statement" { for_each = var.enable_vault_protection ? [1] : [] diff --git a/terraform_aws_backup/aws-backup-destination/modules/aws_config/kms_key.tf b/terraform_aws_backup/aws-backup-destination/modules/aws_config/kms_key.tf index 34f3e27d5..9f0459129 100644 --- a/terraform_aws_backup/aws-backup-destination/modules/aws_config/kms_key.tf +++ b/terraform_aws_backup/aws-backup-destination/modules/aws_config/kms_key.tf @@ -19,6 +19,6 @@ resource "aws_kms_key" "destination_backup_key" { } resource "aws_kms_alias" "destination_backup_key" { - name = "alias/imms-bkp-encryption" + name = "alias/${local.environment}/imms-bkp-encryption" target_key_id = aws_kms_key.destination_backup_key.key_id } \ No newline at end of file diff --git a/terraform_aws_backup/aws-backup-destination/modules/aws_config/variables.tf b/terraform_aws_backup/aws-backup-destination/modules/aws_config/variables.tf index 740a731ef..99517426c 100644 --- a/terraform_aws_backup/aws-backup-destination/modules/aws_config/variables.tf +++ b/terraform_aws_backup/aws-backup-destination/modules/aws_config/variables.tf @@ -36,7 +36,7 @@ variable "enable_vault_protection" { # have its policy changed. The minimum and maximum retention periods are also set only if this is true. description = "Flag which controls if the vault lock is enabled" type = bool - default = false + default = true } variable "vault_lock_type" { @@ -45,23 +45,23 @@ variable "vault_lock_type" { # See toplevel README.md: # DO NOT SET THIS TO compliance UNTIL YOU ARE SURE THAT YOU WANT TO LOCK THE VAULT PERMANENTLY # When you do, you will also need to set "enable_vault_protection" to true for it to take effect. - default = "governance" + default = "compliance" } variable "vault_lock_min_retention_days" { description = "The minimum retention period that the vault retains its recovery points" type = number - default = 60 + default = 30 } variable "vault_lock_max_retention_days" { description = "The maximum retention period that the vault retains its recovery points" type = number - default = 120 + default = 60 } variable "changeable_for_days" { description = "How long you want the vault lock to be changeable for, only applies to compliance mode. This value is expressed in days no less than 3 and no greater than 36,500; otherwise, an error will return." type = number - default = 36500 + default = 30 } diff --git a/terraform_aws_backup/aws-backup-source/main.tf b/terraform_aws_backup/aws-backup-source/main.tf index acccb093c..4076a2aa6 100644 --- a/terraform_aws_backup/aws-backup-source/main.tf +++ b/terraform_aws_backup/aws-backup-source/main.tf @@ -52,7 +52,7 @@ module "source" { backup_copy_vault_account_id = local.destination_account_id backup_copy_vault_arn = data.aws_arn.destination_vault_arn.arn notifications_target_email_address = data.aws_ssm_parameter.notified_email.value - environment_name = terraform.workspace + environment_name = "prod" project_name = "imms-fhir-api-" terraform_role_arn = "arn:aws:iam::${local.source_account_id}:role/${local.assume_role}" source_account_id = data.aws_caller_identity.current.account_id @@ -64,13 +64,13 @@ module "source" { "rules" : [ { "copy_action" : { - "delete_after" : 4 + "delete_after" : 31 }, "lifecycle" : { - "delete_after" : 2 + "delete_after" : 4 }, - "name" : "daily_kept_for_2_days", - "schedule" : "cron(40 15 * * ? *)" + "name" : "daily_kept_for_4_days", + "schedule" : "cron(00 20 * * ? *)" } ], "selection_tag" : "NHSE-Enable-S3-Backup" @@ -84,13 +84,13 @@ module "source" { "rules" : [ { "copy_action" : { - "delete_after" : 4 + "delete_after" : 31 }, "lifecycle" : { - "delete_after" : 2 + "delete_after" : 4 }, - "name" : "daily_kept_for_2_days", - "schedule" : "cron(40 15 * * ? *)" + "name" : "daily_kept_for_4_days", + "schedule" : "cron(00 20 * * ? *)" } ], "selection_tag" : "NHSE-Enable-Dynamo-Backup" diff --git a/terraform_aws_backup/aws-backup-source/modules/aws_config/s3_report_config.tf b/terraform_aws_backup/aws-backup-source/modules/aws_config/s3_report_config.tf index 8b5eb0fd1..9209103a5 100644 --- a/terraform_aws_backup/aws-backup-source/modules/aws_config/s3_report_config.tf +++ b/terraform_aws_backup/aws-backup-source/modules/aws_config/s3_report_config.tf @@ -1,6 +1,6 @@ # First, we create an S3 bucket for compliance reports. resource "aws_s3_bucket" "backup_reports" { - bucket = "${var.project_name}backup-reports" + bucket = "${var.project_name}${var.environment_name}-backup-reports" } resource "aws_s3_bucket_public_access_block" "backup_reports" { diff --git a/terraform_aws_backup/aws-backup-source/modules/aws_config/variables.tf b/terraform_aws_backup/aws-backup-source/modules/aws_config/variables.tf index 59ec8c949..182708b0b 100644 --- a/terraform_aws_backup/aws-backup-source/modules/aws_config/variables.tf +++ b/terraform_aws_backup/aws-backup-source/modules/aws_config/variables.tf @@ -6,6 +6,7 @@ variable "project_name" { variable "environment_name" { description = "The name of the environment where AWS Backup is configured." type = string + default = "prod" } variable "source_account_id" { @@ -38,7 +39,7 @@ variable "restore_testing_plan_start_window" { variable "restore_testing_plan_scheduled_expression" { description = "Scheduled Expression of Recovery Selection Point" type = string - default = "cron(0 1 ? * SUN *)" + default = "cron(30 1 ? * SUN *)" } variable "restore_testing_plan_recovery_point_types" {