diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a13425ee..b2c79dca 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,3 +17,11 @@ repos: rev: v1.4.1 hooks: - id: remove-tabs + #- repo: https://github.com/pre-commit/mirrors-isort + # rev: v5.10.1 + # hooks: + # - id: isort + # name: isort (python) + # entry: isort + # language: python + # types: [python] diff --git a/Pipfile b/Pipfile index 666f6bd0..198a4c8e 100644 --- a/Pipfile +++ b/Pipfile @@ -18,3 +18,6 @@ moto = "~=4.1" datacompy = "~=0.8" docker = "~=6.1" ecs_logging = "~=2.0" +# flask libraries required for moto_server +flask = "~=2.0" +flask-cors = "~=3.0" diff --git a/config/develop/namespaced/glue-workflow.yaml b/config/develop/namespaced/glue-workflow.yaml index 76c91786..d8deb13e 100644 --- a/config/develop/namespaced/glue-workflow.yaml +++ b/config/develop/namespaced/glue-workflow.yaml @@ -7,6 +7,7 @@ dependencies: - develop/namespaced/glue-job-JSONToParquet.yaml - develop/namespaced/glue-job-compare-parquet.yaml - develop/glue-job-role.yaml + - develop/s3-cloudformation-bucket.yaml parameters: Namespace: {{ stack_group_config.namespace }} JsonBucketName: {{ stack_group_config.intermediate_bucket_name }} @@ -16,6 +17,8 @@ parameters: S3ToJsonJobName: !stack_output_external "{{ stack_group_config.namespace }}-glue-job-S3ToJsonS3::JobName" CompareParquetStagingNamespace: {{ stack_group_config.namespace }} CompareParquetMainNamespace: "main" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + CloudformationBucketName: {{ stack_group_config.template_bucket_name }} stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/config/prod/namespaced/glue-workflow.yaml b/config/prod/namespaced/glue-workflow.yaml index 8ac7f8f9..d2260f8b 100644 --- a/config/prod/namespaced/glue-workflow.yaml +++ b/config/prod/namespaced/glue-workflow.yaml @@ -7,6 +7,7 @@ dependencies: - prod/namespaced/glue-job-JSONToParquet.yaml - prod/namespaced/glue-job-compare-parquet.yaml - prod/glue-job-role.yaml + - prod/s3-cloudformation-bucket.yaml parameters: Namespace: {{ stack_group_config.namespace }} JsonBucketName: {{ stack_group_config.intermediate_bucket_name }} @@ -16,6 +17,8 @@ parameters: S3ToJsonJobName: !stack_output_external "{{ stack_group_config.namespace }}-glue-job-S3ToJsonS3::JobName" CompareParquetStagingNamespace: "staging" CompareParquetMainNamespace: "main" + S3SourceBucketName: {{ stack_group_config.input_bucket_name }} + CloudformationBucketName: {{ stack_group_config.template_bucket_name }} stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/src/glue/jobs/compare_parquet_datasets.py b/src/glue/jobs/compare_parquet_datasets.py index 1627ad02..525bf547 100644 --- a/src/glue/jobs/compare_parquet_datasets.py +++ b/src/glue/jobs/compare_parquet_datasets.py @@ -1,47 +1,108 @@ -import argparse -from io import StringIO +from collections import namedtuple +import datetime import json import logging import os import sys +import zipfile +from io import BytesIO, StringIO +from typing import Dict, List, NamedTuple, Union -from awsglue.utils import getResolvedOptions import boto3 import datacompy import pandas as pd -from pyarrow import fs +import pyarrow.dataset as ds import pyarrow.parquet as pq +from awsglue.utils import getResolvedOptions +from pyarrow import fs logger = logging.getLogger() logger.setLevel(logging.INFO) +ADULTS = "adults_v1" +PEDIATRIC = "pediatric_v1" INDEX_FIELD_MAP = { "dataset_enrolledparticipants": ["ParticipantIdentifier"], "dataset_fitbitprofiles": ["ParticipantIdentifier", "ModifiedDate"], - "dataset_fitbitdevices": ["ParticipantIdentifier", "Date"], - "dataset_fitbitactivitylogs": ["LogId"], + "dataset_fitbitdevices": ["ParticipantIdentifier", "Date", "Device"], + "dataset_fitbitactivitylogs": ["ParticipantIdentifier", "LogId"], "dataset_fitbitdailydata": ["ParticipantIdentifier", "Date"], + "dataset_fitbitecg": ["ParticipantIdentifier", "FitbitEcgKey"], "dataset_fitbitintradaycombined": ["ParticipantIdentifier", "Type", "DateTime"], "dataset_fitbitrestingheartrates": ["ParticipantIdentifier", "Date"], - "dataset_fitbitsleeplogs": ["LogId"], - "dataset_healthkitv2characteristics": ["HealthKitCharacteristicKey"], - "dataset_healthkitv2samples": ["HealthKitSampleKey"], - "dataset_healthkitv2samples_deleted": ["HealthKitSampleKey"], - "dataset_healthkitv2heartbeat": ["HealthKitHeartbeatSampleKey"], - "dataset_healthkitv2statistics": ["HealthKitStatisticKey"], - "dataset_healthkitv2clinicalrecords": ["HealthKitClinicalRecordKey"], - "dataset_healthkitv2electrocardiogram": ["HealthKitECGSampleKey"], - "dataset_healthkitv2workouts": ["HealthKitWorkoutKey"], - "dataset_healthkitv2activitysummaries": ["HealthKitActivitySummaryKey"], - "dataset_googlefitsamples": ["GoogleFitSampleKey"], - "dataset_symptomlog": ["DataPointKey"], + "dataset_fitbitsleeplogs": ["ParticipantIdentifier", "LogId"], + "dataset_healthkitv2characteristics": [ + "ParticipantIdentifier", + "HealthKitCharacteristicKey", + ], + "dataset_healthkitv2samples": ["ParticipantIdentifier", "HealthKitSampleKey"], + "dataset_healthkitv2heartbeat": [ + "ParticipantIdentifier", + "HealthKitHeartbeatSampleKey", + ], + "dataset_healthkitv2statistics": ["ParticipantIdentifier", "HealthKitStatisticKey"], + "dataset_healthkitv2clinicalrecords": [ + "ParticipantIdentifier", + "HealthKitClinicalRecordKey", + ], + "dataset_healthkitv2electrocardiogram": [ + "ParticipantIdentifier", + "HealthKitECGSampleKey", + ], + "dataset_healthkitv2workouts": ["ParticipantIdentifier", "HealthKitWorkoutKey"], + "dataset_healthkitv2activitysummaries": [ + "ParticipantIdentifier", + "HealthKitActivitySummaryKey", + ], + "dataset_garminactivitydetailssummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminactivitysummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminbloodpressuresummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garmindailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "dataset_garminepochsummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminhealthsnapshotsummary": [ + "ParticipantIdentifier", + "StartTimeInSeconds", + ], + "dataset_garminhrvsummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "dataset_garminmanuallyupdatedactivitysummary": [ + "ParticipantIdentifier", + "SummaryId", + ], + "dataset_garminmoveiqactivitysummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminpulseoxsummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminrespirationsummary": ["ParticipantIdentifier", "SummaryId"], + "dataset_garminsleepsummary": [ + "ParticipantIdentifier", + "StartTimeInSeconds", + "DurationInSeconds", + "Validation", + ], + "dataset_garminstressdetailsummary": [ + "ParticipantIdentifier", + "StartTimeInSeconds", + ], + "dataset_garminthirdpartydailysummary": [ + "ParticipantIdentifier", + "StartTimeInSeconds", + ], + "dataset_garminusermetricssummary": ["ParticipantIdentifier", "CalenderDate"], + "dataset_googlefitsamples": ["ParticipantIdentifier", "GoogleFitSampleKey"], + "dataset_symptomlog": ["ParticipantIdentifier", "DataPointKey"], } def read_args() -> dict: """Returns the specific params that our code needs to run""" args = getResolvedOptions( - sys.argv, ["data-type", "staging-namespace", "main-namespace", "parquet-bucket"] + sys.argv, + [ + "data-type", + "staging-namespace", + "main-namespace", + "parquet-bucket", + "input-bucket", + "cfn-bucket", + ], ) for arg in args: validate_args(args[arg]) @@ -97,14 +158,16 @@ def get_duplicated_columns(dataset: pd.DataFrame) -> list: def has_common_cols(staging_dataset: pd.DataFrame, main_dataset: pd.DataFrame) -> list: """Gets the list of common columns between two dataframes - TODO: Could look into depreciating this and using datacompy.intersect_columns function""" + TODO: Could look into depreciating this and using datacompy.intersect_columns function + """ common_cols = staging_dataset.columns.intersection(main_dataset.columns).tolist() return common_cols != [] def get_missing_cols(staging_dataset: pd.DataFrame, main_dataset: pd.DataFrame) -> list: """Gets the list of missing columns present in main but not in staging - TODO: Could look into depreciating this and using datacompy.df2_unq_columns function""" + TODO: Could look into depreciating this and using datacompy.df2_unq_columns function + """ missing_cols = main_dataset.columns.difference(staging_dataset.columns).tolist() return missing_cols @@ -113,20 +176,33 @@ def get_additional_cols( staging_dataset: pd.DataFrame, main_dataset: pd.DataFrame ) -> list: """Gets the list of additional columns present in staging but not in main - TODO: Could look into depreciating this and using datacompy.df1_unq_columns function""" + TODO: Could look into depreciating this and using datacompy.df1_unq_columns function + """ add_cols = staging_dataset.columns.difference(main_dataset.columns).tolist() return add_cols def convert_dataframe_to_text(dataset: pd.DataFrame) -> str: - """Converts a pandas DataFrame into a string to save as csv in S3 + """Converts a pandas DataFrame into a string to save as csv in S3. + If the dataframe is empty or has empty columns, it should just + return an empty string + + NOTE: This is because s3.put_object only allows bytes or string + objects when saving them to S3 + + Args: + dataset (pd.DataFrame): input dataset - NOTE: This is because s3.put_object only allows bytes or string - objects when saving them to S3""" - csv_buffer = StringIO() - dataset.to_csv(csv_buffer) - csv_content = csv_buffer.getvalue() - return csv_content + Returns: + str: the resulting string conversion + """ + if not dataset.empty: + csv_buffer = StringIO() + dataset.to_csv(csv_buffer) + csv_content = csv_buffer.getvalue() + return csv_content + else: + return "" def get_S3FileSystem_from_session( @@ -150,54 +226,263 @@ def get_S3FileSystem_from_session( return s3_fs -def get_parquet_dataset( - dataset_key: str, s3_filesystem: fs.S3FileSystem -) -> pd.DataFrame: +def get_export_end_date(filename: str) -> Union[str, None]: + """Gets the export end date (converted to isoformat) based on the filename. + + We can have filenames with with: + - {DateType}_{YYYYMMDD}-{YYYYMMDD}.json + - {DateType}_{YYYYMMDD}.json format + This function does handling for both. The export end date is always the + last component of the filename. + + Args: + filename (str): name of the input json file + + Returns: + Union[str, None]: export end date in isoformat if it exists """ - Returns a Parquet dataset on S3 as a pandas dataframe + if ( + filename is None + or filename == "" + or len(os.path.splitext(filename)[0].split("_")) <= 1 + ): + return None + + filename_components = os.path.splitext(filename)[0].split("_") + + if "-" in filename_components[-1]: + _, end_date = filename_components[-1].split("-") + end_date = datetime.datetime.strptime(end_date, "%Y%m%d") + else: + end_date = datetime.datetime.strptime(filename_components[-1], "%Y%m%d") + export_end_date = end_date.isoformat() + return export_end_date + + +def get_cohort_from_s3_uri(s3_uri: str) -> Union[str, None]: + """Gets the cohort (pediatric_v1 or adults_v1) + from the s3 uri of the export. + + The s3 uri of the export has the following expected format: + s3://{bucket_name}/{namespace}/{cohort}/{export_name} Args: - dataset_key (str): The URI of the parquet dataset. - s3_filesystem (S3FileSystem): A fs.S3FileSystem object + s3_uri (str): the S3 uri Returns: - pandas.DataFrame + Union[str, None]: the cohort if it exists + """ + cohort = None + if ADULTS in s3_uri: + cohort = ADULTS + elif PEDIATRIC in s3_uri: + cohort = PEDIATRIC + return cohort + + +def get_data_type_from_filename(filename: str) -> Union[str, None]: + """Gets the data type from the JSON filename. + + A filename can be of the following format: + {DataType}_[{DataSubType}_][Deleted_]{YYYYMMDD}[-{YYYYMMDD}].json + + Since we don't have support for DataSubType, we only support top + level datatypes but because we only have the subtypes + of the dataset in the exports, we can still use that to get the metadata + we need. We parse out deleted healthkit data types as we don't process them + into parquet. - TODO: Currently, internal pyarrow things like to_table as a - result of the read_table function below takes a while as the dataset - grows bigger. Could find a way to optimize that. + Args: + filename (str): JSON filename + + Returns: + Union[str, None]: the data type otherwise returns None as we + currently don't support comparingsubtypes in data """ - table_source = dataset_key.split("s3://")[-1] - parquet_dataset = pq.read_table(source=table_source, filesystem=s3_filesystem) - return parquet_dataset.to_pandas() + filename_components = os.path.splitext(filename)[0].split("_") + data_type = filename_components[0] + if "HealthKitV2" in data_type and filename_components[-2] == "Deleted": + data_type = f"{data_type}_Deleted" + formatted_data_type = f"dataset_{data_type.lower()}" + + if formatted_data_type in INDEX_FIELD_MAP.keys(): + return formatted_data_type + else: + return None -def get_folders_in_s3_bucket( - s3: boto3.client, bucket_name: str, namespace: str -) -> list: - """Gets the folders in the S3 bucket under the specific namespace +def get_json_files_in_zip_from_s3( + s3: boto3.client, input_bucket: str, s3_uri: str +) -> List[str]: + """Parses through a zipped export and gets the list of json files. + Accepted JSON files cannot be: + - 0 file size + - be a folder (have / in the name) + - Have 'Manifest' in the filename Args: - s3 (boto3.client): authenticated s3 client - bucket_name (str): name of the S3 bucket to look into - namespace (str): namespace of the path to look for folders in + s3 (boto3.client): s3 client connection + input_bucket (str): name of the input bucket containing the zipped exports + s3_uri (str): s3 uri to the zipped export Returns: - list: folder names inside S3 bucket + List[str]: list of json file names """ + # Get the object from S3 + s3_key = s3_uri.split(f"s3://{input_bucket}/")[1] + s3_obj = s3.get_object(Bucket=input_bucket, Key=s3_key) + + # Use BytesIO to treat the zip content as a file-like object + with zipfile.ZipFile(BytesIO(s3_obj["Body"].read())) as z: + # Open the specific JSON file within the zip file + non_empty_contents = [ + f.filename + for f in z.filelist + if "/" not in f.filename + and "Manifest" not in f.filename + and f.file_size > 0 + ] + return non_empty_contents - response = s3.list_objects_v2( - Bucket=bucket_name, Prefix=f"{namespace}/parquet/", Delimiter="/" + +def get_integration_test_exports_json( + s3: boto3.client, cfn_bucket: str, staging_namespace: str +) -> List[str]: + """Reads in the integration test exports json from the cloudformation + bucket that contains the ~2 weeks of production data exports + to use in the staging pipeline + + Args: + s3 (boto3.client): s3 client connection + cfn_bucket (str): cloudformation bucket name + staging_namespace (str): name of namespace containing the "new" data + + Returns: + List[str]: list of the json exports + """ + # read in the json filelist + s3_response_object = s3.get_object( + Bucket=cfn_bucket, Key=f"{staging_namespace}/integration_test_exports.json" ) - if "CommonPrefixes" in response.keys(): - contents = response["CommonPrefixes"] - folders = [ - os.path.normpath(content["Prefix"]).split(os.sep)[-1] - for content in contents - ] - else: - folders = [] - return folders + json_content = s3_response_object["Body"].read().decode("utf-8") + filelist = json.loads(json_content) + return filelist + + +def get_exports_filter_values( + s3: boto3.client, + data_type: str, + input_bucket: str, + cfn_bucket: str, + staging_namespace: str, +) -> Dict[str, str]: + """Parses through the json exports and gets the values + for the cohort and export_end_date to filter on for our + main parquet dataset. The exports_filter will have the following + structure: + { + : [], + ... + } + Args: + s3 (boto3.client): s3 client connection + data_type (str): data type of the dataset + input_bucket (str): input data bucket name + cfn_bucket (str): cloudformation bucket name + staging_namespace (str): name of namespace containing the "new" data + + Returns: + Dict[str, str]: a dict containing the column(s) (key(s)) and values to + filter on + """ + filelist = get_integration_test_exports_json(s3, cfn_bucket, staging_namespace) + # create the dictionary of export end dates and cohort + export_end_date_vals = {} + for s3_uri in filelist: + json_files = get_json_files_in_zip_from_s3(s3, input_bucket, s3_uri) + cur_cohort = get_cohort_from_s3_uri(s3_uri) + for json_file in json_files: + cur_data_type = get_data_type_from_filename(json_file) + cur_export_end_date = get_export_end_date(json_file) + if cur_data_type == data_type: + if cur_cohort in export_end_date_vals.keys(): + export_end_date_vals[cur_cohort].append(cur_export_end_date) + else: + export_end_date_vals[cur_cohort] = [cur_export_end_date] + else: + continue + + return export_end_date_vals + + +def convert_filter_values_to_expression(filter_values: Dict[str, str]) -> ds.Expression: + """Converts the dict of the keys, values to filter on + into filter conditions in the form of a pyarrow.dataset.Expression object + + The expression object takes the following structure for a single condition: + (ds.field("cohort") == ) & + (ds.field("export_end_date").isin([) & (ds.field("export_end_date").isin([) & (ds.field("export_end_date").isin([ pd.DataFrame: + """Returns a parquet dataset on S3 as a pandas dataframe. + The main dataset is optionally filtered using the filter_values + converted to a ds.Expression object prior to being + read into memory. + + Args: + dataset_key (str): The URI of the parquet dataset. + s3_filesystem (S3FileSystem): A fs.S3FileSystem object + filter_values (Dict[str, str]): A dictionary object containing + the columns (keys) and values to filter the dataset on. Defaults to {}. + + Returns: + pd.DataFrame: the filtered table as a pandas dataframe + """ + ds_filter = convert_filter_values_to_expression(filter_values=filter_values) + # Create the dataset object pointing to the S3 location + table_source = dataset_key.split("s3://")[-1] + dataset = ds.dataset( + source=table_source, + filesystem=s3_filesystem, + format="parquet", + partitioning="hive", # allows us to read in partitions as columns + ) + + # Apply any filter and read the dataset into a table + filtered_table = dataset.to_table(filter=ds_filter) + return filtered_table.to_pandas() def get_duplicates(compare_obj: datacompy.Compare, namespace: str) -> pd.DataFrame: @@ -266,48 +551,6 @@ def compare_column_names( return compare_msg -def get_data_types_to_compare( - s3: boto3.client, bucket_name: str, staging_namespace: str, main_namespace: str -) -> list: - """This gets the common data types to run the comparison of the parquet datasets from - the two namespaced paths on based on the folders in the s3 bucket""" - staging_datatype_folders = get_folders_in_s3_bucket( - s3, bucket_name, namespace=staging_namespace - ) - main_datatype_folders = get_folders_in_s3_bucket( - s3, bucket_name, namespace=main_namespace - ) - return list(set(staging_datatype_folders) & set(main_datatype_folders)) - - -def compare_dataset_data_types( - s3: boto3.client, bucket_name: str, staging_namespace: str, main_namespace: str -) -> list: - """This looks at the current datatype folders in the S3 bucket between the - two namespaced paths and outputs a message if there are any differences - in the datatype folders""" - compare_msg = [] - staging_datatype_folders = get_folders_in_s3_bucket( - s3, bucket_name, namespace=staging_namespace - ) - main_datatype_folders = get_folders_in_s3_bucket( - s3, bucket_name, namespace=main_namespace - ) - missing_datatypes = list(set(main_datatype_folders) - set(staging_datatype_folders)) - add_datatypes = list(set(staging_datatype_folders) - set(main_datatype_folders)) - - if missing_datatypes: - compare_msg.append( - f"Staging dataset has the following missing data types: {str(missing_datatypes)}" - ) - - if add_datatypes: - compare_msg.append( - f"Staging dataset has the following additional data types: {str(add_datatypes)}" - ) - return compare_msg - - def compare_datasets_and_output_report( data_type: str, staging_dataset: pd.DataFrame, @@ -355,7 +598,7 @@ def add_additional_msg_to_comparison_report( comparison_report (str): report generated using datacompy add_msgs (list): list of additional messages to include at the bottom of the report msg_type (str): category of message, current available ones are - ["column_name_diff", "data_type_diff"] + ["column_name_diff"] Returns: str: updated comparison report with more specific messages @@ -368,48 +611,41 @@ def add_additional_msg_to_comparison_report( f"Column Name Differences\n" f"-----------------------\n\n{joined_add_msgs}" ) - elif msg_type == "data_type_diff": - updated_comparison_report = ( - f"{comparison_report}" - f"Data Type Differences between the namespaces\n" - f"--------------------------------------------\n\n{joined_add_msgs}" - ) else: - raise ValueError( - "msg_type param must be one of 'column_name_diff', 'data_type_diff'" - ) + raise ValueError("msg_type param must be one of 'column_name_diff'") return updated_comparison_report -def is_valid_dataset(dataset: pd.DataFrame, namespace: str) -> dict: +def check_for_valid_dataset(dataset: pd.DataFrame, namespace: str) -> None: """Checks whether the individual dataset is valid under the following criteria: - no duplicated columns - - dataset is not empty (aka has columns) + - dataset is not empty before it can go through the comparison Args: dataset (pd.DataFrame): dataset to be validated namespace (str): namespace for the dataset - Returns: - dict: containing boolean of the validation result and string message + Raises: + ValueError: When dataset is empty (no columns, no rows or no rows, columns) + ValueError: When dataset has duplicated columns """ # Check that datasets have no emptiness, duplicated columns, or have columns in common - if len(dataset.columns) == 0: - msg = f"{namespace} dataset has no data. Comparison cannot continue." - return {"result": False, "msg": msg} + if dataset.empty: + raise ValueError( + f"The {namespace} dataset is empty. Comparison cannot continue." + ) elif get_duplicated_columns(dataset): - msg = ( + raise ValueError( f"{namespace} dataset has duplicated columns. Comparison cannot continue.\n" f"Duplicated columns:{str(get_duplicated_columns(dataset))}" ) - return {"result": False, "msg": msg} - else: - msg = f"{namespace} dataset has been validated." - return {"result": True, "msg": msg} def compare_datasets_by_data_type( + s3: boto3.client, + cfn_bucket: str, + input_bucket: str, parquet_bucket: str, staging_namespace: str, main_namespace: str, @@ -419,12 +655,18 @@ def compare_datasets_by_data_type( """This runs the bulk of the comparison functions from beginning to end by data type Args: + s3 (boto3.client): s3 client connection + cfn_bucket (str): name of the bucket containing the integration test exports + input_bucket (str): name of the bucket containing the input data parquet_bucket (str): name of the bucket containing the parquet datasets staging_namespace (str): name of namespace containing the "new" data main_namespace (str): name of namespace containing the "established" data s3_filesystem (fs.S3FileSystem): filesystem instantiated by aws credentials data_type (str): data type to be compared for the given datasets + Raises: + ValueError: When the staging and main datasets have no columns in common + Returns: dict: compare_obj: the datacompy.Compare obj on the two datasets @@ -434,6 +676,13 @@ def compare_datasets_by_data_type( f"\n\nParquet Dataset Comparison running for Data Type: {data_type}" f"\n-----------------------------------------------------------------\n\n" ) + filter_values = get_exports_filter_values( + s3=s3, + data_type=data_type, + input_bucket=input_bucket, + cfn_bucket=cfn_bucket, + staging_namespace=staging_namespace, + ) staging_dataset = get_parquet_dataset( dataset_key=get_parquet_dataset_s3_path( parquet_bucket, staging_namespace, data_type @@ -441,29 +690,19 @@ def compare_datasets_by_data_type( s3_filesystem=s3_filesystem, ) main_dataset = get_parquet_dataset( + filter_values=filter_values, dataset_key=get_parquet_dataset_s3_path( parquet_bucket, main_namespace, data_type ), s3_filesystem=s3_filesystem, ) # go through specific validation for each dataset prior to comparison - staging_is_valid_result = is_valid_dataset(staging_dataset, staging_namespace) - main_is_valid_result = is_valid_dataset(main_dataset, main_namespace) - if ( - staging_is_valid_result["result"] == False - or main_is_valid_result["result"] == False - ): - comparison_report = ( - f"{staging_is_valid_result['msg']}\n{main_is_valid_result['msg']}" - ) - compare = None + check_for_valid_dataset(staging_dataset, staging_namespace) + check_for_valid_dataset(main_dataset, main_namespace) + # check that they have columns in common to compare - elif not has_common_cols(staging_dataset, main_dataset): - comparison_report = ( - f"{staging_namespace} dataset and {main_namespace} have no columns in common." - f" Comparison cannot continue." - ) - compare = None + if not has_common_cols(staging_dataset, main_dataset): + raise ValueError("Datasets have no common columns to merge on.") else: logger.info( f"{staging_namespace} dataset memory usage:" @@ -492,141 +731,129 @@ def compare_datasets_by_data_type( } +def has_parquet_files( + s3: boto3.client, bucket_name: str, namespace: str, data_type: str +) -> bool: + """Quick check that a s3 folder location has parquet data + + Args: + s3 (boto3.client): s3 client connection + bucket_name (str): name of the bucket + namespace (str): namespace of the path to the files + data_type (str): data type + + Returns: + bool: Whether this folder location has data + """ + # List objects within a specific S3 bucket and prefix (folder) + response = s3.list_objects_v2( + Bucket=bucket_name, Prefix=os.path.join(namespace, "parquet", data_type) + ) + + # Check if 'Contents' is in the response, which means there are objects in the folder + if "Contents" in response: + for obj in response["Contents"]: + # Check if the object key ends with .parquet + if obj["Key"].endswith(".parquet"): + return True + + return False + + +def upload_reports_to_s3( + s3: boto3.client, + reports: List[NamedTuple], + data_type: str, + parquet_bucket: str, + staging_namespace: str, +) -> None: + """Uploads the various comparison reports to S3 bucket. + + Args: + s3 (boto3.client): s3 client connection + reports (List[NamedTuple]): List of report which contain content(str) and + file_name(str). Content is the string body of the report and file_name is the + name of the file to be saved to S3. + data_type (str): data type to be compared for the given datasets + parquet_bucket (str): name of the bucket containing the parquet datasets + staging_namespace (str): name of namespace containing the "new" data + """ + for report in reports: + if report.content: + s3.put_object( + Bucket=parquet_bucket, + Key=get_s3_file_key_for_comparison_results( + staging_namespace=staging_namespace, + data_type=data_type, + file_name=report.file_name, + ), + Body=report.content, + ) + + def main(): args = read_args() s3 = boto3.client("s3") aws_session = boto3.session.Session(region_name="us-east-1") fs = get_S3FileSystem_from_session(aws_session) data_type = args["data_type"] - - data_types_to_compare = get_data_types_to_compare( + logger.info(f"Running comparison report for {data_type}") + staging_has_parquet_files = has_parquet_files( s3, - args["parquet_bucket"], - main_namespace=args["main_namespace"], - staging_namespace=args["staging_namespace"], + bucket_name=args["parquet_bucket"], + namespace=args["staging_namespace"], + data_type=data_type, ) - data_types_diff = compare_dataset_data_types( + main_has_parquet_files = has_parquet_files( s3, - args["parquet_bucket"], - main_namespace=args["main_namespace"], - staging_namespace=args["staging_namespace"], + bucket_name=args["parquet_bucket"], + namespace=args["main_namespace"], + data_type=data_type, ) - if data_types_to_compare: - logger.info(f"Running comparison report for {data_type}") + if staging_has_parquet_files and main_has_parquet_files: compare_dict = compare_datasets_by_data_type( + s3=s3, + cfn_bucket=args["cfn_bucket"], + input_bucket=args["input_bucket"], parquet_bucket=args["parquet_bucket"], staging_namespace=args["staging_namespace"], main_namespace=args["main_namespace"], s3_filesystem=fs, data_type=data_type, ) - # update comparison report with the data_type differences message - comparison_report = add_additional_msg_to_comparison_report( - compare_dict["comparison_report"], - add_msgs=data_types_diff, - msg_type="data_type_diff", + # List of reports with their corresponding parameters + ReportParams = namedtuple("ReportParams", ["file_name", "content"]) + staging_row_diffs = convert_dataframe_to_text( + compare_row_diffs(compare_dict["compare_obj"], namespace="staging") ) - # save comparison report to report folder in staging namespace - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="parquet_compare.txt", - ), - Body=comparison_report, + main_row_diffs = convert_dataframe_to_text( + compare_row_diffs(compare_dict["compare_obj"], namespace="main") ) - logger.info("Comparison report saved!") - # additional report print outs - compare = compare_dict["compare_obj"] - # TODO: Find out if pandas.to_csv, or direct write to S3 - # is more efficient. s3.put_object is very slow and memory heavy - # esp. if using StringIO conversion - - # print out all mismatch columns - mismatch_cols_report = compare.all_mismatch() - if not mismatch_cols_report.empty: - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="all_mismatch_cols.csv", - ), - Body=convert_dataframe_to_text(mismatch_cols_report), - ) - logger.info("Mismatch columns saved!") - # print out all staging rows that are different to main - staging_rows_report = compare_row_diffs(compare, namespace="staging") - if not staging_rows_report.empty: - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="all_diff_staging_rows.csv", - ), - Body=convert_dataframe_to_text(staging_rows_report), - ) - logger.info("Different staging dataset rows saved!") - # print out all main rows that are different to staging - main_rows_report = compare_row_diffs(compare, namespace="main") - if not main_rows_report.empty: - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="all_diff_main_rows.csv", - ), - Body=convert_dataframe_to_text(main_rows_report), - ) - logger.info("Different main dataset rows saved!") - - # print out all staging duplicated rows - staging_dups_report = get_duplicates(compare, namespace="staging") - if not staging_dups_report.empty: - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="all_dup_staging_rows.csv", - ), - Body=convert_dataframe_to_text(staging_dups_report), - ) - logger.info("Staging dataset duplicates saved!") - # print out all main duplicated rows - main_dups_report = get_duplicates(compare, namespace="main") - if not main_dups_report.empty: - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=data_type, - file_name="all_dup_main_rows.csv", - ), - Body=convert_dataframe_to_text(main_dups_report), - ) - logger.info("Main dataset duplicates saved!") - else: - # update comparison report with the data_type differences message - comparison_report = add_additional_msg_to_comparison_report( - comparison_report, - add_msgs=data_types_diff, - msg_type="data_type_diff", + staging_dups = convert_dataframe_to_text( + get_duplicates(compare_dict["compare_obj"], namespace="staging") + ) + main_dups = convert_dataframe_to_text( + get_duplicates(compare_dict["compare_obj"], namespace="main") ) - print(comparison_report) - s3.put_object( - Bucket=args["parquet_bucket"], - Key=get_s3_file_key_for_comparison_results( - staging_namespace=args["staging_namespace"], - data_type=None, - file_name="data_types_compare.txt", + reports = [ + ReportParams( + content=compare_dict["comparison_report"], + file_name="parquet_compare.txt", ), - Body=comparison_report, + ReportParams( + content=staging_row_diffs, file_name="all_diff_staging_rows.csv" + ), + ReportParams(content=main_row_diffs, file_name="all_diff_main_rows.csv"), + ReportParams(content=staging_dups, file_name="all_dups_staging_rows.csv"), + ReportParams(content=main_dups, file_name="all_dups_main_rows.csv"), + ] + upload_reports_to_s3( + s3=s3, + reports=reports, + data_type=data_type, + parquet_bucket=args["parquet_bucket"], + staging_namespace=args["staging_namespace"], ) - logger.info("Comparison report saved!") return diff --git a/templates/glue-workflow.j2 b/templates/glue-workflow.j2 index 4a9d3e2f..c5ce9e9b 100644 --- a/templates/glue-workflow.j2 +++ b/templates/glue-workflow.j2 @@ -73,6 +73,15 @@ Parameters: Type: String Description: The name of the "main" namespace + S3SourceBucketName: + Type: String + Description: Name of the S3 bucket where source data are stored. + + CloudformationBucketName: + Type: String + Description: >- + The name of the bucket where the cloudformation and artifacts are stored. + Conditions: IsMainNamespace: !Equals [!Ref Namespace, "main"] IsDevelopmentNamespace: !Not [!Equals [!Ref Namespace, "main"]] @@ -273,8 +282,10 @@ Resources: "--data-type": {{ "{}".format(dataset["table_name"]) }} "--main-namespace": !Ref CompareParquetMainNamespace "--staging-namespace": !Ref CompareParquetStagingNamespace + "--input-bucket": !Ref S3SourceBucketName + "--cfn-bucket": !Ref CloudformationBucketName "--parquet-bucket": !Ref ParquetBucketName - "--additional-python-modules": "datacompy~=0.8" + "--additional-python-modules": "datacompy~=0.8 flask~=2.0 flask-cors~=3.0" {% endfor %} Description: This trigger runs after completion of all JSON to Parquet jobs Type: CONDITIONAL diff --git a/tests/Dockerfile.aws_glue_3 b/tests/Dockerfile.aws_glue_3 index 209d83f2..cc0bf7aa 100644 --- a/tests/Dockerfile.aws_glue_3 +++ b/tests/Dockerfile.aws_glue_3 @@ -1,4 +1,4 @@ FROM amazon/aws-glue-libs:glue_libs_3.0.0_image_01 -RUN pip3 install moto~=4.1 datacompy~=0.8 pytest-datadir ecs_logging~=2.0 +RUN pip3 install moto~=4.1 datacompy~=0.8 pytest-datadir ecs_logging~=2.0 flask~=2.0 flask-cors~=3.0 ENTRYPOINT ["bash", "-l"] diff --git a/tests/conftest.py b/tests/conftest.py index 244bc00b..e3a16a70 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,52 +1,40 @@ -import os -from unittest import mock - import boto3 -import pytest import pandas as pd -from pyarrow import fs, parquet +import pytest from moto import mock_s3 +from pyarrow import parquet -@pytest.fixture(scope="function") -def mock_aws_credentials(): - """Mocked AWS Credentials for moto.""" - os.environ["AWS_ACCESS_KEY_ID"] = "testing" - os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" - os.environ["AWS_SECURITY_TOKEN"] = "testing" - os.environ["AWS_SESSION_TOKEN"] = "testing" - os.environ["AWS_DEFAULT_REGION"] = "us-east-1" - - -@pytest.fixture(scope="function") -def s3(mock_aws_credentials): +@pytest.fixture() +def mock_aws_credentials(monkeypatch): + """A mock AWS credentials environment.""" + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") + monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing") + monkeypatch.setenv("AWS_SESSION_TOKEN", "testing") + monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1") + monkeypatch.setenv("AWS_REGION", "us-east-1") + + +@pytest.fixture +def s3(): with mock_s3(): - yield boto3.client("s3", region_name="us-east-1") + s3_client = boto3.client("s3", region_name="us-east-1") + yield s3_client -@pytest.fixture(scope="function") +@pytest.fixture def mock_aws_session(mock_aws_credentials): with mock_s3(): - yield boto3.session.Session(region_name="us-east-1") + aws_session = boto3.session.Session(region_name="us-east-1") + yield aws_session -@pytest.fixture() +@pytest.fixture def parquet_bucket_name(): yield "test-parquet-bucket" -@pytest.fixture(scope="function") -def mock_s3_filesystem(mock_aws_session): - with mock_s3(): - session_credentials = mock_aws_session.get_credentials() - yield fs.S3FileSystem( - region="us-east-1", - access_key=session_credentials.access_key, - secret_key=session_credentials.secret_key, - session_token=session_credentials.token, - ) - - @pytest.fixture(scope="function") def valid_staging_parquet_object(tmpdir_factory, valid_staging_dataset): filename = str(tmpdir_factory.mktemp("data_folder").join("df.parquet")) @@ -61,10 +49,27 @@ def dataset_fixture(request): return request.getfixturevalue(request.param) +@pytest.fixture +def mock_s3_environment(mock_s3_bucket): + """This allows us to persist the bucket and s3 client + """ + with mock_s3(): + s3 = boto3.client('s3', region_name='us-east-1') + s3.create_bucket(Bucket=mock_s3_bucket) + yield s3 + + +@pytest.fixture +def mock_s3_bucket(): + bucket_name = 'test-bucket' + yield bucket_name + + @pytest.fixture() def valid_staging_dataset(): yield pd.DataFrame( { + "ParticipantIdentifier": ["X000000", "X000001", "X000002"], "LogId": [ "44984262767", "46096730542", @@ -90,6 +95,7 @@ def valid_staging_dataset(): def valid_main_dataset(): yield pd.DataFrame( { + "ParticipantIdentifier": ["X000000", "X000001", "X000002"], "LogId": [ "44984262767", "46096730542", @@ -130,6 +136,7 @@ def staging_dataset_with_missing_cols(): def staging_dataset_with_add_cols(): yield pd.DataFrame( { + "ParticipantIdentifier": ["X000000", "X000001", "X000002"], "LogId": [ "44984262767", "46096730542", @@ -156,46 +163,17 @@ def staging_dataset_with_add_cols(): def staging_dataset_with_no_common_cols(): yield pd.DataFrame( { - "ParticipantIdentifier": [ - "MDH-9352-3209", - "MDH-9352-3209", - "MDH-9352-3209", - ], "Steps": ["866", "6074", "5744"], "OriginalDuration": ["768000", "2256000", "2208000"], } ) -@pytest.fixture() -def staging_dataset_with_diff_data_type_cols(): - yield pd.DataFrame( - { - "LogId": [ - "44984262767", - "46096730542", - "51739302864", - ], - "StartDate": [ - "2021-12-24T14:27:39+00:00", - "2022-02-18T08:26:54+00:00", - "2022-10-28T11:58:50+00:00", - ], - "EndDate": [ - "2021-12-24T14:40:27+00:00", - "2022-02-18T09:04:30+00:00", - "2022-10-28T12:35:38+00:00", - ], - "ActiveDuration": [768000, 2256000, 2208000], - "Calories": [89.0, 473.0, 478.0], - } - ) - - @pytest.fixture() def staging_dataset_with_diff_num_of_rows(): yield pd.DataFrame( { + "ParticipantIdentifier": ["X000000"], "LogId": ["44984262767"], "StartDate": ["2021-12-24T14:27:39+00:00"], "EndDate": ["2021-12-24T14:40:27+00:00"], @@ -231,21 +209,11 @@ def staging_dataset_with_dup_indexes(): ) -@pytest.fixture() -def staging_dataset_with_all_col_val_diff(): - yield pd.DataFrame( - { - "LogId": ["44984262767", "44984262767"], - "StartDate": ["2021-12-24T14:27:39+00:00", "2021-12-24T14:27:39+00:00"], - "EndDate": ["TESTING1", "TESTING2"], - } - ) - - @pytest.fixture() def staging_dataset_with_empty_columns(): - return pd.DataFrame( + yield pd.DataFrame( { + "ParticipantIdentifier": [], "LogId": [], "StartDate": [], "EndDate": [], @@ -255,7 +223,7 @@ def staging_dataset_with_empty_columns(): @pytest.fixture() def staging_dataset_empty(): - return pd.DataFrame() + yield pd.DataFrame() def pytest_addoption(parser): diff --git a/tests/test_compare_parquet_datasets.py b/tests/test_compare_parquet_datasets.py index 6ae9dc1b..2e4752a6 100644 --- a/tests/test_compare_parquet_datasets.py +++ b/tests/test_compare_parquet_datasets.py @@ -1,16 +1,98 @@ -import argparse +from collections import namedtuple +import json +import re +import zipfile +from io import BytesIO from unittest import mock import datacompy -from moto import mock_s3 import pandas as pd -from pandas.testing import assert_frame_equal +import pyarrow +import pyarrow.dataset as ds import pytest +from moto import mock_s3 +from moto.server import ThreadedMotoServer +from pandas.testing import assert_frame_equal from pyarrow import fs, parquet from src.glue.jobs import compare_parquet_datasets as compare_parquet +@pytest.fixture(scope="module") +def mock_moto_server(): + """A moto server to mock S3 interactions. + + We cannot use the moto because pyarrow's S3 FileSystem + is not based on boto3 at all. Instead we use the moto_server + feature + (http://docs.getmoto.org/en/latest/docs/getting_started.html#stand-alone-server-mode), + which gives us an endpoint url, that can be used to construct a + pyarrow S3FileSystem that interacts with the moto server. + + References: + https://github.com/apache/arrow/issues/31811 + """ + + server = ThreadedMotoServer(port=3000) + server.start() + yield "http://127.0.0.1:3000" + server.stop() + + +@pytest.fixture +def mock_s3_for_filesystem(mock_aws_session, mock_moto_server): + s3_client = mock_aws_session.client( + "s3", region_name="us-east-1", endpoint_url=mock_moto_server + ) + yield s3_client + + +@pytest.fixture +def mock_s3_filesystem(mock_aws_credentials, mock_aws_session, mock_moto_server): + session_credentials = mock_aws_session.get_credentials() + filesystem = fs.S3FileSystem( + region="us-east-1", + access_key=session_credentials.access_key, + secret_key=session_credentials.secret_key, + session_token=session_credentials.token, + endpoint_override=mock_moto_server, + ) + yield filesystem + + +def add_data_to_mock_bucket( + mock_s3_client: "boto3.client", + input_data: pd.DataFrame, + mock_bucket_name: str, + dataset_key: str, +) -> None: + """Helper function that creates a mock bucket and + adds test data to mock s3 bucket + + Args: + mock_s3_client (boto3.client): mock s3 client + input_data (pd.DataFrame): test data + mock_bucket_name (str): mock s3 bucket name to use + dataset_key (str): path in mock s3 bucket to put data + """ + mock_s3_client.create_bucket(Bucket=mock_bucket_name) + # Create a sample dataframe and upload as a parquet dataset + buffer = BytesIO() + table = pyarrow.Table.from_pandas(input_data) + parquet.write_table(table, buffer) + buffer.seek(0) + mock_s3_client.put_object( + Bucket=mock_bucket_name, Key=dataset_key, Body=buffer.getvalue() + ) + # Ensure the object is uploaded + obj_list = mock_s3_client.list_objects_v2(Bucket=mock_bucket_name) + assert any(obj["Key"] == dataset_key for obj in obj_list.get("Contents", [])) + + # Directly access the S3 object to ensure it's there + response = mock_s3_client.get_object(Bucket=mock_bucket_name, Key=dataset_key) + assert response["Body"].read() is not None + + def test_that_validate_args_raises_exception_when_input_value_is_empty_string(): with pytest.raises(ValueError): compare_parquet.validate_args(value="") @@ -82,72 +164,527 @@ def test_that_get_S3FileSystem_from_session_returns_filesystem_when_credentials_ assert isinstance(filesystem, fs.S3FileSystem) -@mock_s3 -def test_that_get_parquet_dataset_raises_attr_error_if_no_datasets_exist( - s3, mock_s3_filesystem, parquet_bucket_name +@pytest.mark.parametrize( + "filename,expected", + [ + ("HealthKitV2ActivitySummaries_20221026-20221028.json", "2022-10-28T00:00:00"), + ("EnrolledParticipants_20221027.json", "2022-10-27T00:00:00"), + ( + "HealthKitV2Samples_WalkingStepLength_Deleted_20221023-20221024.json", + "2022-10-24T00:00:00", + ), + ( + "HealthKitV2Samples_WalkingStepLength_Deleted_20221024.json", + "2022-10-24T00:00:00", + ), + ( + "HealthKitV2Samples.json", + None, + ), + ( + "", + None, + ), + ( + None, + None, + ), + ], + ids=[ + "filename_with_date_range", + "filename_with_end_date", + "filename_with_subtype_and_date_range", + "filename_with_subtype_and_end_date", + "filename_with_no_date", + "filename_is_empty", + "filename_is_none", + ], +) +def test_that_get_export_end_date_returns_expected(filename, expected): + result = compare_parquet.get_export_end_date(filename) + assert expected == result + + +@pytest.mark.parametrize( + "s3_uri,expected", + [ + ( + "s3://recover-input-data/main/pediatric_v1/2024-06-09T00:15:TEST", + "pediatric_v1", + ), + ("s3://recover-input-data/main/adults_v1/2024-06-09T00:12:49TEST", "adults_v1"), + ("s3://recover-input-data/main/2024-06-09T00:12:49TEST", None), + ], + ids=[ + "peds_cohort", + "adults_cohort", + "no_cohort", + ], +) +def test_that_get_cohort_from_s3_uri_returns_expected(s3_uri, expected): + result = compare_parquet.get_cohort_from_s3_uri(s3_uri) + assert expected == result + + +@pytest.mark.parametrize( + "filename,expected", + [ + ( + "HealthKitV2ActivitySummaries_20221026-20221028.json", + "dataset_healthkitv2activitysummaries", + ), + ("EnrolledParticipants_20221027.json", "dataset_enrolledparticipants"), + ( + "HealthKitV2Samples_WalkingStepLength_Deleted_20221024.json", + None, + ), + ( + "HealthKitV2Workouts_Deleted_20221024.json", + None, + ), + ( + "NonexistentDataType_20221024.json", + None, + ), + ], + ids=[ + "data_type_with_date_range", + "data_type", + "data_type_with_deleted_subtype", + "deleted_data_type", + "invalid_data_type", + ], +) +def test_that_get_data_type_from_filename_returns_expected(filename, expected): + result = compare_parquet.get_data_type_from_filename(filename) + assert expected == result + + +def test_that_get_json_files_in_zip_from_s3_returns_expected_filelist( + mock_s3_environment, mock_s3_bucket ): - file_key = "staging/parquet/dataset_fitbitactivitylogs/test.parquet" - with mock.patch.object(parquet, "read_table", return_value=None) as mock_method: - with pytest.raises(AttributeError): - parquet_dataset = compare_parquet.get_parquet_dataset( - dataset_key=f"{parquet_bucket_name}/{file_key}", - s3_filesystem=mock_s3_filesystem, - ) + # Create a zip file with JSON files + zip_buffer = BytesIO() + with zipfile.ZipFile(zip_buffer, "w") as z: + z.writestr("file1.json", '{"key": "value"}') + z.writestr("file2.json", '{"key2": "value2"}') + z.writestr("Manifest.json", '{"key2": "value2"}') + z.writestr("/folder/test", '{"key2": "value2"}') + z.writestr("empty.json", "") + + zip_buffer.seek(0) + + # Upload the zip file to the mock S3 bucket + s3_uri = f"s3://{mock_s3_bucket}/test.zip" + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="test.zip", Body=zip_buffer.getvalue() + ) + result = compare_parquet.get_json_files_in_zip_from_s3( + mock_s3_environment, mock_s3_bucket, s3_uri + ) -@mock_s3 -def test_that_get_parquet_dataset_returns_dataset_if_datasets_exist( - s3, - mock_s3_filesystem, - valid_staging_parquet_object, - valid_staging_dataset, - parquet_bucket_name, + # Verify the result + assert result == ["file1.json", "file2.json"] + + +def test_that_get_integration_test_exports_json_success( + mock_s3_environment, mock_s3_bucket ): - file_key = "staging/parquet/dataset_fitbitactivitylogs/test.parquet" - with mock.patch.object( - parquet, "read_table", return_value=valid_staging_parquet_object - ) as mock_method: - parquet_dataset = compare_parquet.get_parquet_dataset( - dataset_key=f"{parquet_bucket_name}/{file_key}", - s3_filesystem=mock_s3_filesystem, + staging_namespace = "staging" + + # Create a sample exports.json file content + exports_content = ["file1.json", "file2.json", "file3.json"] + exports_json = json.dumps(exports_content) + + # Upload the exports.json file to the mock S3 bucket + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, + Key=f"{staging_namespace}/integration_test_exports.json", + Body=exports_json, + ) + + result = compare_parquet.get_integration_test_exports_json( + mock_s3_environment, mock_s3_bucket, staging_namespace + ) + assert result == exports_content + + +def test_that_get_integration_test_exports_json_file_not_exist( + mock_s3_environment, mock_s3_bucket +): + staging_namespace = "staging" + + # Call the function and expect an exception + with pytest.raises(mock_s3_environment.exceptions.NoSuchKey): + compare_parquet.get_integration_test_exports_json( + mock_s3_environment, mock_s3_bucket, staging_namespace ) - assert_frame_equal( - parquet_dataset.reset_index(drop=True), - valid_staging_dataset.reset_index(drop=True), + +@pytest.mark.parametrize( + "json_body", + [(""), ("{invalid json}")], + ids=[ + "empty_json", + "invalid_json", + ], +) +def test_that_get_integration_test_exports_json_throws_json_decode_error( + json_body, mock_s3_environment, mock_s3_bucket +): + staging_namespace = "staging" + + # Upload an invalid exports.json file to the mock S3 bucket + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, + Key=f"{staging_namespace}/integration_test_exports.json", + Body=json_body, + ) + + # Call the function and expect a JSON decode error + with pytest.raises(json.JSONDecodeError): + compare_parquet.get_integration_test_exports_json( + mock_s3_environment, mock_s3_bucket, staging_namespace ) -@mock_s3 -def test_that_get_folders_in_s3_bucket_returns_empty_list_if_no_folders( - s3, parquet_bucket_name +@pytest.mark.parametrize( + "data_type, filelist, expected_filter", + [ + ( + "dataset_healthkitv2activitysummaries", + ["s3://bucket/adults_v1/file1.zip", "s3://bucket/pediatric_v1/file2.zip"], + { + "adults_v1": ["2022-10-28T00:00:00", "2022-10-29T00:00:00"], + "pediatric_v1": ["2022-10-28T00:00:00", "2022-10-29T00:00:00"], + }, + ), + ( + "dataset_enrolledparticipants", + ["s3://bucket/adults_v1/file1.zip"], + {"adults_v1": ["2022-10-27T00:00:00"]}, + ), + ( + "dataset_healthkitv2samples", + ["s3://bucket/pediatric_v1/file1.zip"], + {"pediatric_v1": ["2022-10-29T00:00:00", "2022-10-24T00:00:00"]}, + ), + ("dataset_googlefitsamples", ["s3://bucket/adults_v1/file1.zip"], {}), + ( + "dataset_healthkitv2samples_deleted", + ["s3://bucket/pediatric_v1/file1.zip"], + {}, + ), + ], + ids=[ + "empty_filelist", + "adults_cohort_match", + "peds_cohort_match", + "no_data_type_match", + "deleted_data_type_match", + ], +) +def test_that_get_exports_filter_values_returns_expected_results( + s3, data_type, filelist, expected_filter ): - s3.create_bucket(Bucket=parquet_bucket_name) - result = compare_parquet.get_folders_in_s3_bucket( - s3, bucket_name=parquet_bucket_name, namespace="staging" + + with mock.patch.object( + compare_parquet, "get_integration_test_exports_json" + ) as patch_test_exports, mock.patch.object( + compare_parquet, "get_json_files_in_zip_from_s3" + ) as patch_get_json: + patch_test_exports.return_value = filelist + patch_get_json.return_value = [ + "HealthKitV2ActivitySummaries_20221026-20221028.json", + "HealthKitV2ActivitySummaries_20221027-20221029.json", + "HealthKitV2Samples_BloodPressureDiastolic_20221027-20221029.json", + "EnrolledParticipants_20221027.json", + "HealthKitV2Samples_AppleExerciseTime_20221024.json", + "HealthKitV2Samples_AppleExerciseTime_Deleted_20221024.json", + ] + + exports_filter = compare_parquet.get_exports_filter_values( + s3, + data_type, + input_bucket="test_input_bucket", + cfn_bucket="test_cfn_bucket", + staging_namespace="staging", + ) + assert exports_filter == expected_filter + + +@pytest.mark.parametrize( + "input_filter_values, expected_expression", + [ + ( + {"adults_v1": ["2022-10-28T00:00:00"]}, + (ds.field("cohort") == "adults_v1") + & (ds.field("export_end_date").isin(["2022-10-28T00:00:00"])), + ), + ( + { + "adults_v1": ["2022-10-28T00:00:00", "2022-10-29T00:00:00"], + "pediatric_v1": ["2022-10-28T00:00:00", "2022-10-29T00:00:00"], + }, + (ds.field("cohort") == "adults_v1") + & ( + ds.field("export_end_date").isin( + ["2022-10-28T00:00:00", "2022-10-29T00:00:00"] + ) + ) + | (ds.field("cohort") == "pediatric_v1") + & ( + ds.field("export_end_date").isin( + ["2022-10-28T00:00:00", "2022-10-29T00:00:00"] + ) + ), + ), + ({}, None), + ], + ids=["single_condition", "multi_condition", "no_filter_values"], +) +def test_that_convert_filter_values_to_expression_returns_correct_exp( + input_filter_values, expected_expression +): + result = compare_parquet.convert_filter_values_to_expression( + filter_values=input_filter_values ) - assert result == [] + # handle when expression is None + if expected_expression is not None: + assert result.equals(expected_expression) + else: + assert expected_expression == result -@mock_s3 -def test_that_get_folders_in_s3_bucket_returns_list_if_folder_exists( - s3, parquet_bucket_name + +@pytest.mark.parametrize( + "input_data, filter_values, expected_filtered_data", + [ + ( + pd.DataFrame( + { + "cohort": ["pediatric_v1", "adult_v1", "pediatric_v1"], + "export_end_date": [ + "2022-10-24T00:00:00", + "2022-10-25T00:00:00", + "2022-10-23T00:00:00", + ], + "value": [1, 2, 3], + } + ), + {"pediatric_v1": ["2022-10-24T00:00:00", "2022-10-23T00:00:00"]}, + pd.DataFrame( + { + "cohort": ["pediatric_v1", "pediatric_v1"], + "export_end_date": ["2022-10-24T00:00:00", "2022-10-23T00:00:00"], + "value": [1, 3], + } + ), + ), + ( + pd.DataFrame( + { + "cohort": [ + "pediatric_v1", + "adults_v1", + "adults_v1", + "pediatric_v1", + ], + "export_end_date": [ + "2022-10-24T00:00:00", + "2022-10-25T00:00:00", + "2022-10-27T00:00:00", + "2022-10-23T00:00:00", + ], + "value": [1, 2, 3, 4], + } + ), + { + "pediatric_v1": ["2022-10-24T00:00:00", "2022-10-23T00:00:00"], + "adults_v1": ["2022-10-24T00:00:00", "2022-10-25T00:00:00"], + }, + pd.DataFrame( + { + "cohort": ["pediatric_v1", "adults_v1", "pediatric_v1"], + "export_end_date": [ + "2022-10-24T00:00:00", + "2022-10-25T00:00:00", + "2022-10-23T00:00:00", + ], + "value": [1, 2, 4], + } + ), + ), + ( + pd.DataFrame( + { + "cohort": ["pediatric_v1"], + "export_end_date": [ + "2022-10-24T00:00:00", + ], + "value": [1], + } + ), + {}, + pd.DataFrame( + { + "cohort": ["pediatric_v1"], + "export_end_date": [ + "2022-10-24T00:00:00", + ], + "value": [1], + } + ), + ), + ( + pd.DataFrame(), + {}, + pd.DataFrame(), + ), + ( + pd.DataFrame( + { + "cohort": ["pediatric_v1"], + "export_end_date": [ + "2022-10-24T00:00:00", + ], + "value": [1], + } + ), + {"adults_v1": ["2022-10-24T00:00:00"]}, + pd.DataFrame( + { + "cohort": pd.Series(dtype="object"), + "export_end_date": pd.Series(dtype="object"), + "value": pd.Series(dtype="int64"), + } + ), + ), + ( + pd.DataFrame( + { + "cohort": pd.Series(dtype="object"), + "export_end_date": pd.Series(dtype="object"), + "value": pd.Series(dtype="int64"), + } + ), + {}, + pd.DataFrame( + { + "cohort": pd.Series(dtype="object"), + "export_end_date": pd.Series(dtype="object"), + "value": pd.Series(dtype="int64"), + } + ), + ), + ], + ids=[ + "regular_filter", + "multi_condition_filter", + "no_filter", + "empty_df_no_filter", + "empty_cols_after_filter", + "empty_cols_no_filter", + ], +) +def test_that_get_parquet_dataset_returns_expected_results( + mock_s3_for_filesystem, + mock_s3_filesystem, + mock_s3_bucket, + input_data, + filter_values, + expected_filtered_data, ): - s3.create_bucket(Bucket=parquet_bucket_name) + """This will check that the main dataset is being chunked and filtered correctly + based on what is the staging dataset + """ + dataset_key = "test_dataset/main_dataset.parquet" + + add_data_to_mock_bucket( + mock_s3_client=mock_s3_for_filesystem, + input_data=input_data, + mock_bucket_name=mock_s3_bucket, + dataset_key=dataset_key, + ) - for obj in [ - "dataset_fitbitactivitylogs", - "dataset_fitbitactivitylogs/test.txt", - "dataset_fitbitprofiles", - "dataset_fitbitactivitylogs/test2.txt", - "dataset_fitbitprofiles/test.txt", - ]: - s3.put_object(Bucket=parquet_bucket_name, Key=f"staging/parquet/{obj}") + # Call the function to test + result = compare_parquet.get_parquet_dataset( + filter_values=filter_values, + dataset_key=f"s3://{mock_s3_bucket}/{dataset_key}", + s3_filesystem=mock_s3_filesystem, + ) + + # Verify the result + pd.testing.assert_frame_equal( + result, expected_filtered_data, check_index_type=False + ) - result = compare_parquet.get_folders_in_s3_bucket( - s3, bucket_name=parquet_bucket_name, namespace="staging" + +@pytest.mark.parametrize( + "input_data, filter_values, expected_exception", + [ + ( + pd.DataFrame(), + {"adults_v1": ["2022-10-24T00:00:00"]}, + pyarrow.lib.ArrowInvalid, + ), + ( + pd.DataFrame( + { + "cohort": pd.Series(dtype="object"), + "export_end_date": pd.Series(dtype="object"), + "value": pd.Series(dtype="int64"), + } + ), + {"adults_v1": ["2022-10-24T00:00:00"]}, + pyarrow.lib.ArrowNotImplementedError, + ), + ], + ids=[ + "empty_df_before_filter", + "empty_cols_before_filter", + ], +) +def test_that_get_parquet_dataset_raises_expected_exceptions( + mock_s3_for_filesystem, + mock_s3_filesystem, + mock_s3_bucket, + input_data, + filter_values, + expected_exception, +): + """These are the test cases that will end up with pyarrow + exceptions when trying to get the dataset + """ + dataset_key = "test_dataset/main_dataset.parquet" + + add_data_to_mock_bucket( + mock_s3_client=mock_s3_for_filesystem, + input_data=input_data, + mock_bucket_name=mock_s3_bucket, + dataset_key=dataset_key, ) - assert result == ["dataset_fitbitactivitylogs", "dataset_fitbitprofiles"] + + with pytest.raises(expected_exception): + compare_parquet.get_parquet_dataset( + filter_values=filter_values, + dataset_key=f"s3://{mock_s3_bucket}/{dataset_key}", + s3_filesystem=mock_s3_filesystem, + ) + + +@mock_s3 +def test_that_get_parquet_dataset_raises_attr_error_if_no_datasets_exist( + mock_s3_filesystem, parquet_bucket_name +): + file_key = "staging/parquet/dataset_fitbitactivitylogs/test.parquet" + with mock.patch.object(ds, "dataset", return_value=None): + with pytest.raises(AttributeError): + compare_parquet.get_parquet_dataset( + dataset_key=f"{parquet_bucket_name}/{file_key}", + s3_filesystem=mock_s3_filesystem, + ) def test_that_has_common_cols_returns_false_if_no_common_cols( @@ -183,7 +720,7 @@ def test_that_get_missing_cols_returns_list_of_cols_if_missing_cols( test_missing_cols = compare_parquet.get_missing_cols( staging_dataset_with_missing_cols, valid_main_dataset ) - assert test_missing_cols == ["EndDate", "StartDate"] + assert test_missing_cols == ["EndDate", "ParticipantIdentifier", "StartDate"] def test_that_get_additional_cols_returns_empty_list_if_no_add_cols( @@ -204,19 +741,27 @@ def test_that_get_additional_cols_returns_list_of_cols_if_add_cols( assert test_add_cols == ["AverageHeartRate"] -def test_that_dataframe_to_text_returns_str(valid_staging_dataset): - staging_content = compare_parquet.convert_dataframe_to_text(valid_staging_dataset) - assert isinstance(staging_content, str) +@pytest.mark.parametrize( + "input_dataset,expected_str", + [ + (pd.DataFrame(dict(col1=[1, 2], col2=[3, 4])), ",col1,col2\n0,1,3\n1,2,4\n"), + (pd.DataFrame(), ""), + (pd.DataFrame(columns=["col1", "col2"]), ""), + ], + ids=["non_empty_df", "empty_df", "empty_cols"], +) +def test_that_dataframe_to_text_returns_expected_str(input_dataset, expected_str): + result = compare_parquet.convert_dataframe_to_text(input_dataset) + assert result == expected_str def test_that_dataframe_to_text_returns_valid_format_for_s3_put_object( - s3, parquet_bucket_name, valid_staging_dataset + mock_s3_bucket, mock_s3_environment, valid_staging_dataset ): # shouldn't throw a botocore.exceptions.ParamValidationError - s3.create_bucket(Bucket=parquet_bucket_name) staging_content = compare_parquet.convert_dataframe_to_text(valid_staging_dataset) - s3.put_object( - Bucket=parquet_bucket_name, + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key=f"staging/parquet/dataset_fitbitactivitylogs/test.csv", Body=staging_content, ) @@ -289,7 +834,7 @@ def test_that_compare_row_diffs_returns_df_if_columns_are_not_diff( compare = datacompy.Compare( df1=staging_dataset_with_diff_num_of_rows, df2=valid_main_dataset, - join_columns="LogId", + join_columns=["LogId", "ParticipantIdentifier"], df1_name="staging", # Optional, defaults to 'df1' df2_name="main", # Optional, defaults to 'df2' cast_column_names_lower=False, @@ -298,9 +843,10 @@ def test_that_compare_row_diffs_returns_df_if_columns_are_not_diff( main_rows = compare_parquet.compare_row_diffs(compare, namespace="main") assert staging_rows.empty assert_frame_equal( - main_rows.sort_values(by='LogId').reset_index(drop=True), + main_rows.sort_values(by="LogId").reset_index(drop=True), pd.DataFrame( { + "ParticipantIdentifier": ["X000001", "X000002"], "LogId": [ "46096730542", "51739302864", @@ -316,7 +862,9 @@ def test_that_compare_row_diffs_returns_df_if_columns_are_not_diff( "ActiveDuration": ["2256000", "2208000"], "Calories": ["473", "478"], } - ).sort_values(by='LogId').reset_index(drop=True), + ) + .sort_values(by="LogId") + .reset_index(drop=True), ) @@ -345,133 +893,42 @@ def test_that_compare_column_names_returns_msg_if_cols_are_diff( assert compare_msg == [ "dataset_fitbitactivitylogs: Staging dataset has the following missing columns:\n" - "['ActiveDuration', 'Calories', 'EndDate', 'LogId', 'StartDate']", + "['ActiveDuration', 'Calories', 'EndDate', 'LogId', 'ParticipantIdentifier', 'StartDate']", "dataset_fitbitactivitylogs: Staging dataset has the following additional columns:\n" - "['OriginalDuration', 'ParticipantIdentifier', 'Steps']", - ] - - -@mock_s3 -def test_that_get_data_types_to_compare_returns_correct_datatypes_in_common( - s3, parquet_bucket_name -): - s3.create_bucket(Bucket=parquet_bucket_name) - for namespace in ["staging", "main"]: - s3.put_object( - Bucket=parquet_bucket_name, - Key=f"{namespace}/parquet/dataset_fitbitactivitylogs/test.txt", - ) - s3.put_object( - Bucket=parquet_bucket_name, - Key=f"{namespace}/parquet/dataset_fitbitdevices/test.txt", - ) - - data_types = compare_parquet.get_data_types_to_compare( - s3, parquet_bucket_name, staging_namespace="staging", main_namespace="main" - ) - assert set(data_types) == set( - ["dataset_fitbitdevices", "dataset_fitbitactivitylogs"] - ) - - -@mock_s3 -def test_that_get_data_types_to_compare_returns_empty_list_if_no_data_types_in_common( - s3, parquet_bucket_name -): - s3.create_bucket(Bucket=parquet_bucket_name) - s3.put_object( - Bucket=parquet_bucket_name, - Key=f"staging/parquet/dataset_fitbitactivitylogs/test.txt", - ) - s3.put_object( - Bucket=parquet_bucket_name, - Key=f"main/parquet/dataset_fitbitdevices/test.txt", - ) - - data_types = compare_parquet.get_data_types_to_compare( - s3, parquet_bucket_name, staging_namespace="staging", main_namespace="main" - ) - assert data_types == [] - - -@mock_s3 -def test_that_compare_dataset_data_types_returns_empty_msg_if_datatypes_are_equal( - s3, parquet_bucket_name -): - s3.create_bucket(Bucket=parquet_bucket_name) - for namespace in ["staging", "main"]: - s3.put_object( - Bucket=parquet_bucket_name, - Key=f"{namespace}/parquet/dataset_fitbitactivitylogs/test.txt", - ) - compare_msg = compare_parquet.compare_dataset_data_types( - s3, parquet_bucket_name, staging_namespace="staging", main_namespace="main" - ) - assert compare_msg == [] - - -@mock_s3 -def test_that_compare_dataset_data_types_returns_msg_if_datatypes_are_not_equal( - s3, parquet_bucket_name -): - s3.create_bucket(Bucket=parquet_bucket_name) - for datatype in [ - "dataset_fitbitactivitylogs/test.txt", - "dataset_fitbitintradaycombined/test.txt", - ]: - s3.put_object(Bucket=parquet_bucket_name, Key=f"staging/parquet/{datatype}") - - for datatype in [ - "dataset_fitbitactivitylogs/test.txt", - "dataset_fitbitdevices/test.txt", - ]: - s3.put_object(Bucket=parquet_bucket_name, Key=f"main/parquet/{datatype}") - - compare_msg = compare_parquet.compare_dataset_data_types( - s3, parquet_bucket_name, staging_namespace="staging", main_namespace="main" - ) - assert compare_msg == [ - "Staging dataset has the following missing data types: ['dataset_fitbitdevices']", - "Staging dataset has the following additional data types: ['dataset_fitbitintradaycombined']", + "['OriginalDuration', 'Steps']", ] -def test_that_is_valid_dataset_returns_true_if_dataset_is_valid(valid_staging_dataset): - is_valid_result = compare_parquet.is_valid_dataset(valid_staging_dataset, "staging") - assert is_valid_result["result"] - assert is_valid_result["msg"] == "staging dataset has been validated." - - -def test_that_is_valid_dataset_returns_false_if_dataset_is_empty(staging_dataset_empty): - is_valid_result = compare_parquet.is_valid_dataset(staging_dataset_empty, "staging") - assert is_valid_result["result"] is False - assert ( - is_valid_result["msg"] - == "staging dataset has no data. Comparison cannot continue." - ) - - -def test_that_is_valid_dataset_returns_false_if_dataset_has_dup_cols( - staging_dataset_with_dup_cols, +@pytest.mark.parametrize( + "dataset_fixture,expected_error", + [ + ( + "staging_dataset_empty", + "The staging dataset is empty. Comparison cannot continue.", + ), + ( + "staging_dataset_with_empty_columns", + "The staging dataset is empty. Comparison cannot continue.", + ), + ( + "staging_dataset_with_dup_cols", + "staging dataset has duplicated columns. Comparison cannot continue.\nDuplicated columns:['EndDate']", + ), + ], + ids=["empty_df_no_rows_no_cols", "empty_df_no_rows", "df_dup_cols"], + indirect=["dataset_fixture"], +) +def test_that_check_for_valid_dataset_raises_exception_if_dataset_is_invalid( + dataset_fixture, expected_error ): - is_valid_result = compare_parquet.is_valid_dataset( - staging_dataset_with_dup_cols, "staging" - ) - assert is_valid_result["result"] is False - assert is_valid_result["msg"] == ( - "staging dataset has duplicated columns. Comparison cannot continue.\n" - "Duplicated columns:['EndDate']" - ) + with pytest.raises(ValueError, match=re.escape(expected_error)): + compare_parquet.check_for_valid_dataset(dataset_fixture, "staging") -def test_that_is_valid_dataset_returns_true_if_dataset_has_empty_cols( - staging_dataset_with_empty_columns, +def test_that_check_for_valid_dataset_raises_no_exception_if_dataset_is_valid( + valid_staging_dataset, ): - is_valid_result = compare_parquet.is_valid_dataset( - staging_dataset_with_empty_columns, "staging" - ) - assert is_valid_result["result"] - assert is_valid_result["msg"] == "staging dataset has been validated." + compare_parquet.check_for_valid_dataset(valid_staging_dataset, "staging") @pytest.mark.parametrize( @@ -511,70 +968,237 @@ def test_that_add_additional_msg_to_comparison_report_throws_error_if_msg_type_n comparison_report = "some string\n\n" add_msgs = ["one message", "two message"] with pytest.raises(ValueError): - result = compare_parquet.add_additional_msg_to_comparison_report( + compare_parquet.add_additional_msg_to_comparison_report( comparison_report, add_msgs, msg_type="invalid_msg_type" ) -def test_that_compare_datasets_by_data_type_returns_correct_msg_if_input_is_empty( - parquet_bucket_name, staging_dataset_empty +@mock.patch("src.glue.jobs.compare_parquet_datasets.compare_datasets_and_output_report") +def test_that_compare_datasets_by_data_type_returns_correct_msg_if_input_is_invalid( + mocked_compare_datasets, s3, parquet_bucket_name, staging_dataset_empty ): with mock.patch( "src.glue.jobs.compare_parquet_datasets.get_parquet_dataset", return_value=staging_dataset_empty, - ) as mock_parquet: - compare_dict = compare_parquet.compare_datasets_by_data_type( - parquet_bucket=parquet_bucket_name, - staging_namespace="staging", - main_namespace="main", - s3_filesystem=None, - data_type="dataset_fitbitactivitylogs", - ) - assert compare_dict["comparison_report"] == ( - "\n\nParquet Dataset Comparison running for Data Type: dataset_fitbitactivitylogs\n" - "-----------------------------------------------------------------\n\n" - "staging dataset has no data. Comparison cannot continue.\n" - "main dataset has no data. Comparison cannot continue." - ) + ), mock.patch( + "src.glue.jobs.compare_parquet_datasets.get_exports_filter_values", + ): + with pytest.raises( + ValueError, + match="The staging dataset is empty. Comparison cannot continue.", + ): + compare_parquet.compare_datasets_by_data_type( + s3=s3, + cfn_bucket="test_cfn_bucket", + input_bucket="test_input_bucket", + parquet_bucket=parquet_bucket_name, + staging_namespace="staging", + main_namespace="main", + s3_filesystem=None, + data_type="dataset_fitbitactivitylogs", + ) + mocked_compare_datasets.assert_not_called() @mock.patch("src.glue.jobs.compare_parquet_datasets.compare_datasets_and_output_report") def test_that_compare_datasets_by_data_type_calls_compare_datasets_by_data_type_if_input_is_valid( - mocked_compare_datasets, parquet_bucket_name, valid_staging_dataset + mocked_compare_datasets, parquet_bucket_name, valid_staging_dataset, s3 ): with mock.patch( "src.glue.jobs.compare_parquet_datasets.get_parquet_dataset", return_value=valid_staging_dataset, - ) as mock_parquet: + ) as patch_get_parquet_data, mock.patch( + "src.glue.jobs.compare_parquet_datasets.get_exports_filter_values", + return_value="some_filter", + ) as patch_get_filter, mock.patch( + "src.glue.jobs.compare_parquet_datasets.check_for_valid_dataset", + ) as patch_check_valid: compare_parquet.compare_datasets_by_data_type( + s3=s3, + cfn_bucket="test_cfn_bucket", + input_bucket="test_input_bucket", parquet_bucket=parquet_bucket_name, staging_namespace="staging", main_namespace="main", s3_filesystem=None, data_type="dataset_fitbitactivitylogs", ) - mocked_compare_datasets.assert_called_once() + patch_get_parquet_data.assert_has_calls( + [ + mock.call( + dataset_key=f"s3://{parquet_bucket_name}/staging/parquet/dataset_fitbitactivitylogs", + s3_filesystem=None, + ), + mock.call( + filter_values="some_filter", + dataset_key=f"s3://{parquet_bucket_name}/main/parquet/dataset_fitbitactivitylogs", + s3_filesystem=None, + ), + ] + ) + patch_get_filter.assert_called_once_with( + s3=s3, + data_type="dataset_fitbitactivitylogs", + input_bucket="test_input_bucket", + cfn_bucket="test_cfn_bucket", + staging_namespace="staging", + ) + patch_check_valid.assert_has_calls( + [ + mock.call(valid_staging_dataset, "staging"), + mock.call(valid_staging_dataset, "main"), + ] + ) + mocked_compare_datasets.assert_called_once_with( + data_type="dataset_fitbitactivitylogs", + staging_dataset=valid_staging_dataset, + main_dataset=valid_staging_dataset, + staging_namespace="staging", + main_namespace="main", + ) @mock.patch("src.glue.jobs.compare_parquet_datasets.compare_datasets_and_output_report") -@mock.patch( - "src.glue.jobs.compare_parquet_datasets.has_common_cols", return_value=False -) -def test_that_compare_datasets_by_data_type_does_not_call_compare_datasets_by_data_type_if_input_has_no_common_cols( - mocked_has_common_cols, +def test_that_compare_datasets_by_data_type_raises_exception_if_input_has_no_common_cols( mocked_compare_datasets, parquet_bucket_name, valid_staging_dataset, + s3, ): with mock.patch( "src.glue.jobs.compare_parquet_datasets.get_parquet_dataset", return_value=valid_staging_dataset, - ) as mock_parquet: - compare_parquet.compare_datasets_by_data_type( - parquet_bucket=parquet_bucket_name, - staging_namespace="staging", - main_namespace="main", - s3_filesystem=None, - data_type="dataset_fitbitactivitylogs", + ), mock.patch( + "src.glue.jobs.compare_parquet_datasets.get_exports_filter_values", + ), mock.patch( + "src.glue.jobs.compare_parquet_datasets.has_common_cols", + return_value=False, + ): + with pytest.raises( + ValueError, match="Datasets have no common columns to merge on." + ): + compare_parquet.compare_datasets_by_data_type( + s3=s3, + cfn_bucket="test_cfn_bucket", + input_bucket="test_input_bucket", + parquet_bucket=parquet_bucket_name, + staging_namespace="staging", + main_namespace="main", + s3_filesystem=None, + data_type="dataset_fitbitactivitylogs", + ) + mocked_compare_datasets.assert_not_called() + + +def test_that_has_parquet_files_returns_false_with_incorrect_location( + mock_s3_environment, mock_s3_bucket +): + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="incorrect_location/file1.parquet", Body="data" + ) + assert ( + compare_parquet.has_parquet_files( + mock_s3_environment, mock_s3_bucket, "test", "test_data" + ) + == False + ) + + +def test_that_has_parquet_files_returns_false_with_no_files( + mock_s3_environment, mock_s3_bucket +): + assert ( + compare_parquet.has_parquet_files( + mock_s3_environment, mock_s3_bucket, "test", "test_data" + ) + == False + ) + + +def test_that_has_parquet_files_returns_false_with_no_parquet_files( + mock_s3_environment, mock_s3_bucket +): + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="test/parquet/test_data/file1.txt", Body="data" + ) + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="test/parquet/test_data/file2.csv", Body="data" + ) + assert ( + compare_parquet.has_parquet_files( + mock_s3_environment, mock_s3_bucket, "test", "test_data" + ) + == False + ) + + +def test_that_has_parquet_files_returns_true_with_parquet_files( + mock_s3_environment, mock_s3_bucket +): + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="test/parquet/test_data/file1.parquet", Body="data" + ) + mock_s3_environment.put_object( + Bucket=mock_s3_bucket, Key="test/parquet/test_data/file2.txt", Body="data" + ) + assert ( + compare_parquet.has_parquet_files( + mock_s3_environment, mock_s3_bucket, "test", "test_data" + ) + == True + ) + + +def test_that_upload_reports_to_s3_has_expected_calls(s3): + """For this test, the empty dataframe should not have a report saved to S3""" + df1 = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + df2 = pd.DataFrame() + df3 = pd.DataFrame({"col1": [1, 2, 3], "col2": [3, 4, 5]}) + + # create our test reports mirroring the functionality in main + ReportParams = namedtuple("ReportParams", ["file_name", "content"]) + reports = [ + ReportParams( + file_name="file1.csv", + content=compare_parquet.convert_dataframe_to_text(df1), + ), + ReportParams( + file_name="file2.csv", + content=compare_parquet.convert_dataframe_to_text(df2), + ), + ReportParams( + file_name="file3.csv", + content=compare_parquet.convert_dataframe_to_text(df3), + ), + ] + + with mock.patch.object(s3, "put_object") as mock_put_object: + compare_parquet.upload_reports_to_s3( + s3=s3, + reports=reports, + parquet_bucket="my_bucket", + data_type="my_data_type", + staging_namespace="my_namespace", + ) + mock_put_object.assert_has_calls( + [ + mock.call( + Bucket="my_bucket", + Key=compare_parquet.get_s3_file_key_for_comparison_results( + staging_namespace="my_namespace", + data_type="my_data_type", + file_name="file1.csv", + ), + Body=compare_parquet.convert_dataframe_to_text(df1), + ), + mock.call( + Bucket="my_bucket", + Key=compare_parquet.get_s3_file_key_for_comparison_results( + staging_namespace="my_namespace", + data_type="my_data_type", + file_name="file3.csv", + ), + Body=compare_parquet.convert_dataframe_to_text(df3), + ), + ] ) - mocked_compare_datasets.assert_not_called()