Skip to content

Commit

Permalink
Merge pull request #330 from NHSDigital/AMB-2382_audit_table_and_file…
Browse files Browse the repository at this point in the history
…_movement_refactor

AMB-2382_audit_table_and_file_movement_refactor
  • Loading branch information
AlexandraBenson authored Jan 31, 2025
2 parents e570f39 + 55e3341 commit 4014a9a
Show file tree
Hide file tree
Showing 53 changed files with 1,742 additions and 1,714 deletions.
54 changes: 26 additions & 28 deletions ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
"""Ack lambda handler"""

import json
from utils_for_ack_lambda import get_environment
from typing import Union
from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator
from update_ack_file import update_ack_file, create_ack_data
from clients import s3_client


ENVIRONMENT = get_environment()
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"
def get_error_message_for_ack_file(message_diagnostics) -> Union[None, str]:
"""Determines and returns the error message to be displayed in the ack file"""
if message_diagnostics is None:
return None

if not isinstance(message_diagnostics, dict):
return "Unable to determine diagnostics issue"

if message_diagnostics.get("statusCode") in (None, 500):
return "An unhandled error occurred during batch processing"

return message_diagnostics.get("error_message", "Unable to determine diagnostics issue")


@convert_messsage_to_ack_row_logging_decorator
def convert_message_to_ack_row(message, created_at_formatted_string):
Expand All @@ -18,24 +27,13 @@ def convert_message_to_ack_row(message, created_at_formatted_string):
A value error is raised if the file_key or created_at_formatted_string for the message do not match the
expected values.
"""
error_message_for_ack_file: Union[None, str]
if (diagnostics := message.get("diagnostics")) is None:
error_message_for_ack_file = None
elif isinstance(diagnostics, dict):
status_code = diagnostics.get("statusCode")
if status_code is None or status_code == 500:
error_message_for_ack_file = "An unhandled error occurred during batch processing"
else:
error_message_for_ack_file = diagnostics.get("error_message", "Unable to determine diagnostics issue")
else:
error_message_for_ack_file = "Unable to determine diagnostics issue"

diagnostics = message.get("diagnostics")
return create_ack_data(
created_at_formatted_string=created_at_formatted_string,
local_id=message.get("local_id"),
row_id=message.get("row_id"),
successful_api_response=diagnostics is None, # Response is successful if and only if there are no diagnostics
diagnostics=error_message_for_ack_file,
diagnostics=get_error_message_for_ack_file(diagnostics),
imms_id=message.get("imms_id"),
)

Expand All @@ -53,9 +51,11 @@ def lambda_handler(event, context):

file_key = None
created_at_formatted_string = None
message_id = None
supplier_queue = None

ack_data_rows = []

array_of_rows = []

for i, record in enumerate(event["Records"]):

try:
Expand All @@ -67,17 +67,15 @@ def lambda_handler(event, context):
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case.
file_key = incoming_message_body[0].get("file_key")
message_id = (incoming_message_body[0].get("row_id", "")).split("^")[0]
vaccine_type = incoming_message_body[0].get("vaccine_type")
supplier = incoming_message_body[0].get("supplier")
supplier_queue = f"{supplier}_{vaccine_type}"
created_at_formatted_string = incoming_message_body[0].get("created_at_formatted_string")

for message in incoming_message_body:
array_of_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
row_count = get_row_count_stream(SOURCE_BUCKET_NAME, f"processing/{file_key}")
update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows, row_count=row_count)
return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))

update_ack_file(file_key, message_id, supplier_queue, created_at_formatted_string, ack_data_rows)

def get_row_count_stream(bucket_name, key):
response = s3_client.get_object(Bucket=bucket_name, Key=key)
count = sum(1 for line in response['Body'].iter_lines() if line.strip())

return count
return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}
88 changes: 35 additions & 53 deletions ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,49 @@
"""Add the filename to the audit table and check for duplicates."""

import os

from typing import Union
from boto3.dynamodb.conditions import Key,Attr
from boto3.dynamodb.conditions import Key
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError


def update_audit_table_status(file_key: str) -> str:
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, FileStatus, AuditTableKeys


def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
"""
Update the status in the audit table.
Checks for queued files.
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
"""
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
)

queued_files_details: list = queued_files_found_in_audit_table["Items"]

# Return the oldest queued file
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None


def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
try:
table_name = os.environ["AUDIT_TABLE_NAME"]
file_name_gsi = "filename_index"
file_name_response = dynamodb_resource.Table(table_name).query(
IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key)
)
items = file_name_response.get("Items", [])
message_id = items[0].get("message_id")
queue_name = items[0].get("queue_name")
# Add to the audit table
# Update the status in the audit table to "Processed"
dynamodb_client.update_item(
TableName=table_name,
Key={"message_id": {"S": message_id}},
TableName=AUDIT_TABLE_NAME,
Key={AuditTableKeys.MESSAGE_ID: {"S": message_id}},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={":status": {"S": "Processed"}},
ConditionExpression="attribute_exists(message_id)"
ExpressionAttributeValues={":status": {"S": FileStatus.PROCESSED}},
ConditionExpression="attribute_exists(message_id)",
)
logger.info("%s file, with message id %s, and the status successfully updated to audit table", file_key, message_id)
return queue_name
except Exception as error: # pylint: disable = broad-exception-caught
error_message = error #f"Error adding {file_key} to the audit table"
logger.error(error_message)
raise UnhandledAuditTableError(error_message) from error

def get_queued_file_details(queue_name: str) -> tuple[Union[None,str],Union[None,str]]:
"""
Check for queued files which return none or oldest file queued for processing.
Returns a tuple in the format (file_name, message_id) for the oldest file.
Defaults to (none,none) if no file found in queued status
"""
table_name = os.environ["AUDIT_TABLE_NAME"]
queue_name_gsi = "queue_name_index"

queue_response = dynamodb_resource.Table(table_name).query(
IndexName=queue_name_gsi,
KeyConditionExpression=Key("queue_name").eq(queue_name)
& Key("status").eq("Queued"),
logger.info(
"The status of %s file, with message id %s, was successfully updated to %s in the audit table",
file_key,
message_id,
FileStatus.PROCESSED,
)
if queue_response["Items"]:
file_name, message_id = get_file_name(queue_response)
return file_name, message_id
else:
return None, None

def get_file_name(queue_response: dict) -> tuple[str,str]:
"""
Returns (file_name, message_id) for the oldest file.
"""
sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"])
first_record = sorted_item[0]
file_name = first_record.get("filename")
message_id = first_record.get("message_id")
return file_name, message_id
except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error
65 changes: 46 additions & 19 deletions ack_backend/src/constants.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,49 @@
"""Constants for ack lambda"""

import os
from utils_for_ack_lambda import get_environment

class Constants:
"""Constants for ack lambda"""

ack_headers = [
"MESSAGE_HEADER_ID",
"HEADER_RESPONSE_CODE",
"ISSUE_SEVERITY",
"ISSUE_CODE",
"ISSUE_DETAILS_CODE",
"RESPONSE_TYPE",
"RESPONSE_CODE",
"RESPONSE_DISPLAY",
"RECEIVED_TIME",
"MAILBOX_FROM",
"LOCAL_ID",
"IMMS_ID",
"OPERATION_OUTCOME",
"MESSAGE_DELIVERY",
]
ENVIRONMENT = get_environment()
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"
ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME")
AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME")
AUDIT_TABLE_FILENAME_GSI = "filename_index"
AUDIT_TABLE_QUEUE_NAME_GSI = "queue_name_index"
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")


class FileStatus:
"""File status constants"""

QUEUED = "Queued"
PROCESSING = "Processing"
PROCESSED = "Processed"
DUPLICATE = "Duplicate"


class AuditTableKeys:
"""Audit table keys"""

FILENAME = "filename"
MESSAGE_ID = "message_id"
QUEUE_NAME = "queue_name"
STATUS = "status"
TIMESTAMP = "timestamp"


ACK_HEADERS = [
"MESSAGE_HEADER_ID",
"HEADER_RESPONSE_CODE",
"ISSUE_SEVERITY",
"ISSUE_CODE",
"ISSUE_DETAILS_CODE",
"RESPONSE_TYPE",
"RESPONSE_CODE",
"RESPONSE_DISPLAY",
"RECEIVED_TIME",
"MAILBOX_FROM",
"LOCAL_ID",
"IMMS_ID",
"OPERATION_OUTCOME",
"MESSAGE_DELIVERY",
]
26 changes: 1 addition & 25 deletions ack_backend/src/errors.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,5 @@
"""Custom exceptions for the Filename Processor."""


class DuplicateFileError(Exception):
"""A custom exception for when it is identified that the file is a duplicate."""


class ProcessingError(Exception):
"""A custom exception for when it is identified that supplier_vaccine file is under processing"""
"""Custom exceptions for the Ack lambda."""


class UnhandledAuditTableError(Exception):
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""


class VaccineTypePermissionsError(Exception):
"""A custom exception for when the supplier does not have the necessary vaccine type permissions."""


class InvalidFileKeyError(Exception):
"""A custom exception for when the file key is invalid."""


class InvalidSupplierError(Exception):
"""A custom exception for when the supplier has not been correctly identified."""


class UnhandledSqsError(Exception):
"""A custom exception for when an unexpected error occurs whilst sending a message to SQS."""
Loading

0 comments on commit 4014a9a

Please sign in to comment.