Skip to content

Commit

Permalink
Merge pull request #298 from NHSDigital/AMB-2343_ECS_logging_for_inf_ack
Browse files Browse the repository at this point in the history
Amb 2343 ecs logging for inf ack
  • Loading branch information
AlexandraBenson authored Dec 30, 2024
2 parents f1a4579 + f835441 commit 5dc5eda
Show file tree
Hide file tree
Showing 21 changed files with 719 additions and 429 deletions.
108 changes: 0 additions & 108 deletions e2e/test_batch_processing.py

This file was deleted.

2 changes: 1 addition & 1 deletion filenameprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_to_audit_table(message_id: str, file_key: str, created_at_formatted_str:
Item={
"message_id": {"S": message_id},
"filename": {"S": file_key},
"status": {"S": "Processed"},
"status": {"S": "Not processed - duplicate" if duplicate_exists else "Processed"},
"timestamp": {"S": created_at_formatted_str},
},
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
Expand Down
2 changes: 1 addition & 1 deletion filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def handle_record(record) -> dict:
UnhandledSqsError,
Exception,
) as error:
logger.error("Error processing file'%s': %s", file_key, str(error))
logger.error("Error processing file '%s': %s", file_key, str(error))

# Create ack file
# (note that error may have occurred before message_id and created_at_formatted_string were generated)
Expand Down
6 changes: 4 additions & 2 deletions filenameprocessor/src/logging_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def generate_and_send_logs(


def logging_decorator(func):
"""Sends the appropriate logs to Cloudwatch and Firehose based on the function result.
"""
Sends the appropriate logs to Cloudwatch and Firehose based on the function result.
NOTE: The function must return a dictionary as its only return value. The dictionary is expected to contain
all of the required additional details for logging.
NOTE: Logs will include the result of the function call or, in the case of an Exception being raised,
a status code of 500 and the error message."""
a status code of 500 and the error message.
"""

@wraps(func)
def wrapper(*args, **kwargs):
Expand Down
124 changes: 37 additions & 87 deletions recordprocessor/src/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
import json
import os
import time
from constants import Constants
from utils_for_recordprocessor import get_csv_content_dict_reader
from unique_permission import get_unique_action_flags_from_s3
from make_and_upload_ack_file import make_and_upload_ack_file
from get_operation_permissions import get_operation_permissions
from process_row import process_row
from mappings import Vaccine
from send_to_kinesis import send_to_kinesis
from clients import logger
from file_level_validation import file_level_validation
from errors import NoOperationPermissions, InvalidHeaders


def process_csv_to_fhir(incoming_message_body: dict) -> None:
Expand All @@ -20,91 +16,45 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
and documents the outcome for each row in the ack file.
"""
logger.info("Event: %s", incoming_message_body)
# Get details needed to process file
file_id = incoming_message_body.get("message_id")
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
)
supplier = incoming_message_body.get("supplier").upper()
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
allowed_operations = get_operation_permissions(vaccine, permission)

# Fetch the data
bucket_name = os.getenv("SOURCE_BUCKET_NAME")
csv_reader, csv_data = get_csv_content_dict_reader(bucket_name, file_key)
is_valid_headers = validate_content_headers(csv_reader)

# Validate has permission to perform at least one of the requested actions
action_flag_check = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)

if not action_flag_check or not is_valid_headers:
make_and_upload_ack_file(file_id, file_key, False, False, created_at_formatted_string)
else:
# Initialise the accumulated_ack_file_content with the headers
make_and_upload_ack_file(file_id, file_key, True, True, created_at_formatted_string)

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)
try:
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
# If the file is invalid, processing should cease immediately
return None

file_id = interim_message_body.get("message_id")
vaccine = interim_message_body.get("vaccine")
supplier = interim_message_body.get("supplier")
file_key = interim_message_body.get("file_key")
allowed_operations = interim_message_body.get("allowed_operations")
created_at_formatted_string = interim_message_body.get("created_at_formatted_string")
csv_reader = interim_message_body.get("csv_dict_reader")

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)

# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)

logger.info("Total rows processed: %s", row_count)


def validate_content_headers(csv_content_reader):
"""Returns a bool to indicate whether the given CSV headers match the 34 expected headers exactly"""
return csv_content_reader.fieldnames == Constants.expected_csv_headers


def validate_action_flag_permissions(
supplier: str, vaccine_type: str, allowed_permissions_list: list, csv_data
) -> bool:
"""
Returns True if the supplier has permission to perform ANY of the requested actions for the given vaccine type,
else False.
"""
# If the supplier has full permissions for the vaccine type, return True
if f"{vaccine_type}_FULL" in allowed_permissions_list:
return True

# Get unique ACTION_FLAG values from the S3 file
operations_requested = get_unique_action_flags_from_s3(csv_data)

# Convert action flags into the expected operation names
requested_permissions_set = {
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested
}

# Check if any of the CSV permissions match the allowed permissions
if requested_permissions_set.intersection(allowed_permissions_list):
logger.info(
"%s permissions %s match one of the requested permissions required to %s",
supplier,
allowed_permissions_list,
requested_permissions_set,
)
return True

return False


def main(event: str) -> None:
"""Process each row of the file"""
logger.info("task started")
Expand All @@ -114,7 +64,7 @@ def main(event: str) -> None:
except Exception as error: # pylint: disable=broad-exception-caught
logger.error("Error processing message: %s", error)
end = time.time()
logger.info(f"Total time for completion:{round(end - start, 5)}s")
logger.info("Total time for completion: %ss", round(end - start, 5))


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions recordprocessor/src/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

s3_client = boto3_client("s3", region_name=REGION_NAME)
kinesis_client = boto3_client("kinesis", region_name=REGION_NAME)
sqs_client = boto3_client("sqs", region_name=REGION_NAME)
firehose_client = boto3_client("firehose", region_name=REGION_NAME)


# Logger
logging.basicConfig(level="INFO")
logger = logging.getLogger()
logger.setLevel("INFO")
9 changes: 9 additions & 0 deletions recordprocessor/src/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom exceptions for the Record Processor."""


class NoOperationPermissions(Exception):
"""A custom exception for when the supplier has no permissions for any of the requested operations."""


class InvalidHeaders(Exception):
"""A custom exception for when the file headers are invalid."""
Loading

0 comments on commit 5dc5eda

Please sign in to comment.