Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amb 2343 ecs logging for inf ack #298

Merged
merged 12 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 0 additions & 108 deletions e2e/test_batch_processing.py

This file was deleted.

2 changes: 1 addition & 1 deletion filenameprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_to_audit_table(message_id: str, file_key: str, created_at_formatted_str:
Item={
"message_id": {"S": message_id},
"filename": {"S": file_key},
"status": {"S": "Processed"},
"status": {"S": "Not processed - duplicate" if duplicate_exists else "Processed"},
"timestamp": {"S": created_at_formatted_str},
},
ConditionExpression="attribute_not_exists(message_id)", # Prevents accidental overwrites
Expand Down
2 changes: 1 addition & 1 deletion filenameprocessor/src/file_name_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def handle_record(record) -> dict:
UnhandledSqsError,
Exception,
) as error:
logger.error("Error processing file'%s': %s", file_key, str(error))
logger.error("Error processing file '%s': %s", file_key, str(error))

# Create ack file
# (note that error may have occurred before message_id and created_at_formatted_string were generated)
Expand Down
6 changes: 4 additions & 2 deletions filenameprocessor/src/logging_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def generate_and_send_logs(


def logging_decorator(func):
"""Sends the appropriate logs to Cloudwatch and Firehose based on the function result.
"""
Sends the appropriate logs to Cloudwatch and Firehose based on the function result.
NOTE: The function must return a dictionary as its only return value. The dictionary is expected to contain
all of the required additional details for logging.
NOTE: Logs will include the result of the function call or, in the case of an Exception being raised,
a status code of 500 and the error message."""
a status code of 500 and the error message.
"""

@wraps(func)
def wrapper(*args, **kwargs):
Expand Down
124 changes: 37 additions & 87 deletions recordprocessor/src/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
import json
import os
import time
from constants import Constants
from utils_for_recordprocessor import get_csv_content_dict_reader
from unique_permission import get_unique_action_flags_from_s3
from make_and_upload_ack_file import make_and_upload_ack_file
from get_operation_permissions import get_operation_permissions
from process_row import process_row
from mappings import Vaccine
from send_to_kinesis import send_to_kinesis
from clients import logger
from file_level_validation import file_level_validation
from errors import NoOperationPermissions, InvalidHeaders


def process_csv_to_fhir(incoming_message_body: dict) -> None:
Expand All @@ -20,91 +16,45 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
and documents the outcome for each row in the ack file.
"""
logger.info("Event: %s", incoming_message_body)
# Get details needed to process file
file_id = incoming_message_body.get("message_id")
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
)
supplier = incoming_message_body.get("supplier").upper()
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
allowed_operations = get_operation_permissions(vaccine, permission)

# Fetch the data
bucket_name = os.getenv("SOURCE_BUCKET_NAME")
csv_reader, csv_data = get_csv_content_dict_reader(bucket_name, file_key)
is_valid_headers = validate_content_headers(csv_reader)

# Validate has permission to perform at least one of the requested actions
action_flag_check = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)

if not action_flag_check or not is_valid_headers:
make_and_upload_ack_file(file_id, file_key, False, False, created_at_formatted_string)
else:
# Initialise the accumulated_ack_file_content with the headers
make_and_upload_ack_file(file_id, file_key, True, True, created_at_formatted_string)

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)
try:
interim_message_body = file_level_validation(incoming_message_body=incoming_message_body)
except (InvalidHeaders, NoOperationPermissions, Exception): # pylint: disable=broad-exception-caught
# If the file is invalid, processing should cease immediately
return None

file_id = interim_message_body.get("message_id")
vaccine = interim_message_body.get("vaccine")
supplier = interim_message_body.get("supplier")
file_key = interim_message_body.get("file_key")
allowed_operations = interim_message_body.get("allowed_operations")
created_at_formatted_string = interim_message_body.get("created_at_formatted_string")
csv_reader = interim_message_body.get("csv_dict_reader")

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)

# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)

logger.info("Total rows processed: %s", row_count)


def validate_content_headers(csv_content_reader):
"""Returns a bool to indicate whether the given CSV headers match the 34 expected headers exactly"""
return csv_content_reader.fieldnames == Constants.expected_csv_headers


def validate_action_flag_permissions(
supplier: str, vaccine_type: str, allowed_permissions_list: list, csv_data
) -> bool:
"""
Returns True if the supplier has permission to perform ANY of the requested actions for the given vaccine type,
else False.
"""
# If the supplier has full permissions for the vaccine type, return True
if f"{vaccine_type}_FULL" in allowed_permissions_list:
return True

# Get unique ACTION_FLAG values from the S3 file
operations_requested = get_unique_action_flags_from_s3(csv_data)

# Convert action flags into the expected operation names
requested_permissions_set = {
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested
}

# Check if any of the CSV permissions match the allowed permissions
if requested_permissions_set.intersection(allowed_permissions_list):
logger.info(
"%s permissions %s match one of the requested permissions required to %s",
supplier,
allowed_permissions_list,
requested_permissions_set,
)
return True

return False


def main(event: str) -> None:
"""Process each row of the file"""
logger.info("task started")
Expand All @@ -114,7 +64,7 @@ def main(event: str) -> None:
except Exception as error: # pylint: disable=broad-exception-caught
logger.error("Error processing message: %s", error)
end = time.time()
logger.info(f"Total time for completion:{round(end - start, 5)}s")
logger.info("Total time for completion: %ss", round(end - start, 5))


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions recordprocessor/src/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

s3_client = boto3_client("s3", region_name=REGION_NAME)
kinesis_client = boto3_client("kinesis", region_name=REGION_NAME)
sqs_client = boto3_client("sqs", region_name=REGION_NAME)
firehose_client = boto3_client("firehose", region_name=REGION_NAME)


# Logger
logging.basicConfig(level="INFO")
logger = logging.getLogger()
logger.setLevel("INFO")
9 changes: 9 additions & 0 deletions recordprocessor/src/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom exceptions for the Record Processor."""


class NoOperationPermissions(Exception):
"""A custom exception for when the supplier has no permissions for any of the requested operations."""


class InvalidHeaders(Exception):
"""A custom exception for when the file headers are invalid."""
Loading
Loading