Skip to content

Commit

Permalink
Move file level validation into separate function and add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandraBenson committed Dec 17, 2024
1 parent c551b4e commit 287bb53
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 137 deletions.
121 changes: 35 additions & 86 deletions recordprocessor/src/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
import json
import os
import time
from constants import Constants
from utils_for_recordprocessor import get_csv_content_dict_reader
from unique_permission import get_unique_action_flags_from_s3
from make_and_upload_ack_file import make_and_upload_ack_file
from get_operation_permissions import get_operation_permissions
from process_row import process_row
from mappings import Vaccine
from send_to_kinesis import send_to_kinesis
from clients import logger
from file_level_validation import initial_file_validation
from errors import NoOperationPermissions, InvalidHeaders


def process_csv_to_fhir(incoming_message_body: dict) -> None:
Expand All @@ -20,91 +16,44 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
and documents the outcome for each row in the ack file.
"""
logger.info("Event: %s", incoming_message_body)
# Get details needed to process file
file_id = incoming_message_body.get("message_id")
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
)
supplier = incoming_message_body.get("supplier").upper()
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")
allowed_operations = get_operation_permissions(vaccine, permission)

# Fetch the data
bucket_name = os.getenv("SOURCE_BUCKET_NAME")
csv_reader, csv_data = get_csv_content_dict_reader(bucket_name, file_key)
is_valid_headers = validate_content_headers(csv_reader)

# Validate has permission to perform at least one of the requested actions
action_flag_check = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)

if not action_flag_check or not is_valid_headers:
make_and_upload_ack_file(file_id, file_key, False, False, created_at_formatted_string)
else:
# Initialise the accumulated_ack_file_content with the headers
make_and_upload_ack_file(file_id, file_key, True, True, created_at_formatted_string)

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)
try:
interim_message_body = initial_file_validation(incoming_message_body)
except (InvalidHeaders, NoOperationPermissions):
# If the file is invalid, processing should cease immediately
return None

file_id = interim_message_body.get("message_id")
vaccine = interim_message_body.get("vaccine")
supplier = interim_message_body.get("supplier")
file_key = interim_message_body.get("file_key")
allowed_operations = interim_message_body.get("allowed_operations")
created_at_formatted_string = interim_message_body.get("created_at_formatted_string")
csv_reader = interim_message_body.get("csv_dict_reader")

row_count = 0 # Initialize a counter for rows
for row in csv_reader:
row_count += 1
row_id = f"{file_id}^{row_count}"
logger.info("MESSAGE ID : %s", row_id)
# Process the row to obtain the details needed for the message_body and ack file
details_from_processing = process_row(vaccine, allowed_operations, row)

# Create the message body for sending
outgoing_message_body = {
"row_id": row_id,
"file_key": file_key,
"supplier": supplier,
"vax_type": vaccine.value,
"created_at_formatted_string": created_at_formatted_string,
**details_from_processing,
}

send_to_kinesis(supplier, outgoing_message_body)

logger.info("Total rows processed: %s", row_count)


def validate_content_headers(csv_content_reader):
"""Returns a bool to indicate whether the given CSV headers match the 34 expected headers exactly"""
return csv_content_reader.fieldnames == Constants.expected_csv_headers


def validate_action_flag_permissions(
supplier: str, vaccine_type: str, allowed_permissions_list: list, csv_data
) -> bool:
"""
Returns True if the supplier has permission to perform ANY of the requested actions for the given vaccine type,
else False.
"""
# If the supplier has full permissions for the vaccine type, return True
if f"{vaccine_type}_FULL" in allowed_permissions_list:
return True

# Get unique ACTION_FLAG values from the S3 file
operations_requested = get_unique_action_flags_from_s3(csv_data)

# Convert action flags into the expected operation names
requested_permissions_set = {
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested
}

# Check if any of the CSV permissions match the allowed permissions
if requested_permissions_set.intersection(allowed_permissions_list):
logger.info(
"%s permissions %s match one of the requested permissions required to %s",
supplier,
allowed_permissions_list,
requested_permissions_set,
)
return True

return False


def main(event: str) -> None:
"""Process each row of the file"""
logger.info("task started")
Expand Down
9 changes: 9 additions & 0 deletions recordprocessor/src/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom exceptions for the Record Processor."""


class NoOperationPermissions(Exception):
"""A custom exception for when the supplier has no permissions for any of the requested operations."""


class InvalidHeaders(Exception):
"""A custom exception for when the file headers are invalid."""
88 changes: 88 additions & 0 deletions recordprocessor/src/file_level_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
from constants import Constants
from unique_permission import get_unique_action_flags_from_s3
from clients import logger
from make_and_upload_ack_file import make_and_upload_ack_file
from mappings import Vaccine
from utils_for_recordprocessor import get_csv_content_dict_reader
from errors import InvalidHeaders, NoOperationPermissions


def validate_content_headers(csv_content_reader) -> None:
"""Raises an InvalidHeaders error if the headers in the CSV file do not match the expected headers."""
if csv_content_reader.fieldnames != Constants.expected_csv_headers:
raise InvalidHeaders("File headers are invalid.")


def validate_action_flag_permissions(
supplier: str, vaccine_type: str, allowed_permissions_list: list, csv_data: str
) -> set:
"""
Validates that the supplier has permission to perform at least one of the requested operations for the given
vaccine type and returns the set of allowed operations for that vaccine type.
Raises a NoPermissionsError if the supplier does not have permission to perform any of the requested operations.
"""
# If the supplier has full permissions for the vaccine type, return True
if f"{vaccine_type}_FULL" in allowed_permissions_list:
return {"CREATE", "UPDATE", "DELETE"}

# Get unique ACTION_FLAG values from the S3 file
operations_requested = get_unique_action_flags_from_s3(csv_data)

# Convert action flags into the expected operation names
requested_permissions_set = {
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested
}

# Check if any of the CSV permissions match the allowed permissions
if not requested_permissions_set.intersection(allowed_permissions_list):
raise NoOperationPermissions(f"{supplier} does not have permissions to perform any of the requested actions.")

logger.info(
"%s permissions %s match one of the requested permissions required to %s",
supplier,
allowed_permissions_list,
requested_permissions_set,
)

return {perm.split("_")[1].upper() for perm in allowed_permissions_list if perm.startswith(vaccine_type)}


def initial_file_validation(incoming_message_body: dict) -> None:
"""Validates that the csv headers are correct and that the supplier has permission to perform at least one of
the requested operations. Returns an interim message body for row level processing."""

file_id = incoming_message_body.get("message_id")
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
)
supplier = incoming_message_body.get("supplier").upper()
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")

# Fetch the data
bucket_name = os.getenv("SOURCE_BUCKET_NAME")
csv_reader, csv_data = get_csv_content_dict_reader(bucket_name, file_key)

try:
validate_content_headers(csv_reader)

# Validate has permission to perform at least one of the requested actions
allowed_operations_set = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)
except (InvalidHeaders, NoOperationPermissions):
make_and_upload_ack_file(file_id, file_key, False, False, created_at_formatted_string)
raise

# Initialise the accumulated_ack_file_content with the headers
make_and_upload_ack_file(file_id, file_key, True, True, created_at_formatted_string)

return {
"message_id": file_id,
"vaccine": vaccine,
"supplier": supplier,
"file_key": file_key,
"allowed_operations": allowed_operations_set,
"created_at_formatted_string": created_at_formatted_string,
"csv_dict_reader": csv_reader,
}
12 changes: 0 additions & 12 deletions recordprocessor/src/get_operation_permissions.py

This file was deleted.

2 changes: 1 addition & 1 deletion recordprocessor/src/logging_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from datetime import datetime
from functools import wraps
from s3_clients import firehose_client, logger
from clients import firehose_client, logger

STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", "immunisation-fhir-api-internal-dev-splunk-firehose")

Expand Down
Loading

0 comments on commit 287bb53

Please sign in to comment.