diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index a08c6365c..64e9e6ca1 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -7,6 +7,7 @@ from src.imputation import imputation_helpers as hlp from src.imputation import tmi_imputation as tmi from src.staging.validation import load_schema +from src.imputation.pg_conversion import run_pg_conversion, pg_to_pg_mapper from src.imputation.apportionment import run_apportionment from src.imputation.short_to_long import run_short_to_long from src.imputation.MoR import run_mor @@ -21,7 +22,8 @@ def run_imputation( df: pd.DataFrame, manual_trimming_df: pd.DataFrame, - mapper: pd.DataFrame, + pg_num_alpha: pd.DataFrame, + sic_pg_num: pd.DataFrame, backdata: pd.DataFrame, config: Dict[str, Any], write_csv: Callable, @@ -48,6 +50,11 @@ def run_imputation( Returns: pd.DataFrame: dataframe with the imputed columns updated """ + # Carry out product group conversion + df = run_pg_conversion( + df, pg_num_alpha, sic_pg_num, pg_column="201" + ) + # Apportion cols 4xx and 5xx to create FTE and headcount values df = run_apportionment(df) @@ -92,11 +99,24 @@ def run_imputation( # Run MoR if backdata is not None: + # Fix for different column names on network vs hdfs + if NETWORK_OR_HDFS == "network": + # Map PG numeric to alpha in column q201 + # This isn't done on HDFS as the column is already mapped + backdata = pg_to_pg_mapper( + backdata, + pg_num_alpha, + pg_column="q201", + from_col= "pg_numeric", + to_col="pg_alpha", + ) + backdata = backdata.drop("pg_numeric", axis=1) + lf_target_vars = config["imputation"]["lf_target_vars"] df, links_df = run_mor(df, backdata, to_impute_cols, lf_target_vars, config) # Run TMI for long forms and short forms - imputed_df, qa_df = tmi.run_tmi(df, mapper, config) + imputed_df, qa_df = tmi.run_tmi(df, config) # After imputation, correction to ignore the "604" == "No" in any records with # Status "check needed" diff --git a/src/imputation/pg_conversion.py b/src/imputation/pg_conversion.py new file mode 100644 index 000000000..fa6d0556b --- /dev/null +++ b/src/imputation/pg_conversion.py @@ -0,0 +1,161 @@ +import pandas as pd +import logging +import numpy as np + +PgLogger = logging.getLogger(__name__) + + +def sic_to_pg_mapper( + df: pd.DataFrame, + sicmapper: pd.DataFrame, + pg_column: str = "201", + sic_column: str = "rusic", + from_col: str = "SIC 2007_CODE", + to_col: str = "2016 > Form PG", +): + """Map from SIC code to PG numeric code where PG numeric is null. + + Example initial dataframe: + reference | 201 | rusic + -------------------------------- + 1 | 53 | 2500 + 2 | NaN | 1600 + 3 | NaN | 4300 + + returned dataframe: + reference | 201 | rusic + -------------------------------- + 1 | 53 | 2500 + 2 | 45 | 1600 + 3 | 38 | 4300 + + Args: + df (pd.DataFrame): The dataset containing all the PG numbers. + sicmapper (pd.DataFrame): The SIC to pg numeric mapper. + sic_column (str, optional): The column containing the SIC numbers. + from_col (str, optional): The column in the mapper that is used to map from. + to_col (str, optional): The column in the mapper that is used to map to. + + Returns: + pd.DataFrame: A dataframe with all target column values mapped + """ + + df = df.copy() + + # Create a mapping dictionary from the 2 columns + map_dict = dict(zip(sicmapper[from_col], sicmapper[to_col])) + # Flag all SIC numbers that don't have a corresponding map value + mapless_errors = [] + for key, value in map_dict.items(): + if str(value) == "nan": + mapless_errors.append(key) + + if mapless_errors: + PgLogger.error( + f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}" + ) + raise Exception("Errors in the SIC to PG numeric mapper.") + + # Map to the target column using the dictionary, null values only + df.loc[df[pg_column].isnull(), pg_column] = ( + df.loc[df[pg_column].isnull(), sic_column].map(map_dict) + ) + + PgLogger.info("Product group nulls successfully mapped from SIC.") + + return df + + +def pg_to_pg_mapper( + df: pd.DataFrame, + mapper: pd.DataFrame, + pg_column: str = "201", + from_col: str = "pg_numeric", + to_col: str = "pg_alpha", +): + """Map from PG numeric to PG alpha-numeric and create a new column. + + The mapper used is from a file named pg_num_alpha.csv + + The product group column (default: column 201) is copied to a new column, + "pg_numeric", and then the original column is mapped from numeric to alpha-numeric. + + Example initial dataframe: + reference | 201 + ---------------------- + 1 | 53 + 2 | 43 + 3 | 33 + + returned dataframe: + reference | 201 | pg_numeric + ------------------------------------ + 1 | AA | 33 + 2 | B | 43 + 3 | E | 53 + + + Args: + df (pd.DataFrame): The dataframe requiring mapping + mapper (pd.DataFrame): the PG numeric to alpha-numeric mapper + pg_column (str, optional): The column we want to convert (default 201). + from_col (str, optional): The column in the mapper that is used to map from. + to_col (str, optional): The column in the mapper that is used to map to. + + Returns: + pd.DataFrame: A dataframe with all target column values mapped + """ + + df = df.copy() + + # Copy the numeric PG column to a new column + df["pg_numeric"] = df[pg_column].copy() + + # Create a mapping dictionary from the 2 columns + map_dict = dict(zip(mapper[from_col], mapper[to_col])) + + # Flag all PGs that don't have a corresponding map value + mapless_errors = [] + for key, value in map_dict.items(): + if str(value) == "nan": + mapless_errors.append(key) + + if mapless_errors: + PgLogger.error( + f"Mapping doesnt exist for the following product groups: {mapless_errors}" + ) + raise Exception("Errors in the PG numeric to alpha-numeric mapper.") + + df[pg_column] = df[pg_column].map(map_dict) + + # Then convert the pg column and the new column to categorigal datatypes + df = df.astype({pg_column: "category", "pg_numeric": "category"}) + + PgLogger.info("Numeric product groups successfully mapped to letters.") + + return df + + +def run_pg_conversion( + df: pd.DataFrame, + pg_num_alpha: pd.DataFrame, + sic_pg_num: pd.DataFrame, + pg_column: str = "201", +): + """Run the product group (PG) mapping functions. + + Args: + df (pd.DataFrame): Dataframe of full responses data + pg_num_alpha (pd.DataFrame): Mapper from numeric to alpha-numeric PG. + pg_column: The original product group column, default 201 + + Returns: + (pd.DataFrame): Dataframe with mapped values + """ + # Where product group is null, map it from SIC. + df = sic_to_pg_mapper(df, sic_pg_num, pg_column) + + # PG numeric to alpha_numeric mapping for long forms + df = pg_to_pg_mapper(df, pg_num_alpha, pg_column) + + return df diff --git a/src/imputation/tmi_imputation.py b/src/imputation/tmi_imputation.py index ecd170875..c3ea7eaff 100644 --- a/src/imputation/tmi_imputation.py +++ b/src/imputation/tmi_imputation.py @@ -3,7 +3,7 @@ import numpy as np from typing import Dict, List, Tuple, Any -from src.staging.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import sic_to_pg_mapper from src.imputation.impute_civ_def import impute_civil_defence from src.imputation import expansion_imputation as ximp @@ -425,7 +425,6 @@ def calculate_totals(df): def run_longform_tmi( longform_df: pd.DataFrame, - sic_mapper: pd.DataFrame, config: Dict[str, Any], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Function to run imputation end to end and returns the final @@ -434,7 +433,6 @@ def run_longform_tmi( Args: longform_df (pd.DataFrame): the dataset filtered for long form entries target_variables (list): key variables - sic_mapper (pd.DataFrame): dataframe with sic mapper info config (Dict): the configuration settings Returns: final_df: dataframe with the imputed valued added @@ -442,10 +440,7 @@ def run_longform_tmi( qa_df: qa dataframe """ TMILogger.info("Starting TMI long form imputation.") - - # TMI Step 1: impute the Product Group - df = impute_pg_by_sic(longform_df, sic_mapper) - + df = longform_df.copy() # TMI Step 2: impute for R&D type (civil or defence) df = impute_civil_defence(df) @@ -520,7 +515,6 @@ def run_shortform_tmi( def run_tmi( full_df: pd.DataFrame, - sic_mapper: pd.DataFrame, config: Dict[str, Any], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Function to run imputation end to end and returns the final @@ -528,7 +522,6 @@ def run_tmi( dataframe back to the pipeline Args: full_df (pd.DataFrame): the full responses spp dataframe - sic_mapper (pd.DataFrame): dataframe with sic to product group mapper info config (Dict): the configuration settings Returns: final_df(pd.DataFrame): dataframe with the imputed valued added and counts columns @@ -553,7 +546,7 @@ def run_tmi( excluded_df = full_df.copy().loc[mor_mask] # apply TMI imputation to long forms and then short forms - longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, sic_mapper, config) + longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, config) shortform_tmi_df, qa_df_short = run_shortform_tmi(shortform_df, config) diff --git a/src/outputs/form_output_prep.py b/src/outputs/form_output_prep.py index 6ef352657..93aaa381c 100644 --- a/src/outputs/form_output_prep.py +++ b/src/outputs/form_output_prep.py @@ -1,5 +1,5 @@ import pandas as pd -from src.staging.pg_conversion import run_pg_conversion +from src.imputation.pg_conversion import run_pg_conversion from src.staging.validation import flag_no_rand_spenders @@ -8,7 +8,7 @@ def form_output_prep( weighted_df: pd.DataFrame, ni_full_responses: pd.DataFrame, pg_num_alpha: pd.DataFrame, - sic_pg_alpha: pd.DataFrame, + sic_pg_num: pd.DataFrame, ): """Prepares the data for the outputs. @@ -46,8 +46,12 @@ def form_output_prep( ni_full_responses["form_status"] = 600 ni_full_responses["602"] = 100 ni_full_responses["formtype"] = "0003" + + # Update column 201 (currently PG numeric) to alpha-numeric, mapping from SIC. ni_full_responses = run_pg_conversion( - ni_full_responses, pg_num_alpha, sic_pg_alpha, target_col="201" + ni_full_responses, + pg_num_alpha, + sic_pg_num ) # outputs_df = pd.concat([outputs_df, ni_full_responses]) diff --git a/src/outputs/gb_sas.py b/src/outputs/gb_sas.py index 4435a465f..73ffaad8c 100644 --- a/src/outputs/gb_sas.py +++ b/src/outputs/gb_sas.py @@ -7,7 +7,6 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df, regions -from src.staging.pg_conversion import sic_to_pg_mapper GbSasLogger = logging.getLogger(__name__) @@ -19,7 +18,6 @@ def output_gb_sas( run_id: int, ultfoc_mapper: pd.DataFrame, postcode_mapper: pd.DataFrame, - sic_pg_num: pd.DataFrame, ): """Run the outputs module. @@ -31,8 +29,6 @@ def output_gb_sas( run_id (int): The current run id ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame. postcode_mapper (pd.DataFrame): maps the postcode to region code - pg_alpha_num (pd.DataFrame): mapper of numeric PG to alpha PG - """ NETWORK_OR_HDFS = config["global"]["network_or_hdfs"] @@ -47,20 +43,9 @@ def output_gb_sas( # Join foriegn ownership column using ultfoc mapper df1 = map_o.join_fgn_ownership(df1, ultfoc_mapper) - # Fill in numeric PG for short forms and imputed long forms - df1 = sic_to_pg_mapper( - df1, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0006", "0001"], - ) - # Map to the CORA statuses from the statusencoded column df1 = map_o.create_cora_status_col(df1) - # Map the sizebands based on frozen employment df1 = map_o.map_sizebands(df1) diff --git a/src/outputs/ni_sas.py b/src/outputs/ni_sas.py index b9ea85285..717f76854 100644 --- a/src/outputs/ni_sas.py +++ b/src/outputs/ni_sas.py @@ -6,7 +6,7 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df -from src.staging.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import run_pg_conversion OutputMainLogger = logging.getLogger(__name__) @@ -16,8 +16,6 @@ def output_ni_sas( config: Dict[str, Any], write_csv: Callable, run_id: int, - sic_pg_num: pd.DataFrame, - postcode_itl_mapper: pd.DataFrame, ): """Run the outputs module. @@ -39,18 +37,6 @@ def output_ni_sas( paths = config[f"{NETWORK_OR_HDFS}_paths"] output_path = paths["output_path"] - # Prepare the columns needed for outputs: - - # Fill in numeric PG where missing - df = sic_to_pg_mapper( - df, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0003"], - ) - # Map the sizebands based on frozen employment df = map_o.map_sizebands(df) diff --git a/src/outputs/outputs_main.py b/src/outputs/outputs_main.py index dc93b367d..c0987d74c 100644 --- a/src/outputs/outputs_main.py +++ b/src/outputs/outputs_main.py @@ -57,7 +57,7 @@ def run_outputs( civil_defence_detailed (pd.DataFrame): Detailed descriptons of civil/defence sic_division_detailed (pd.DataFrame): Detailed descriptons of SIC divisions pg_num_alpha (pd.DataFrame): Mapper for product group conversions (num to alpha) - sic_pg_alpha (pd.DataFrame): Mapper for product group conversions (SIC to alpha) + sic_pg_num (pd.DataFrame): Mapper for product group conversions """ ( @@ -69,7 +69,7 @@ def run_outputs( weighted_df, ni_full_responses, pg_num_alpha, - sic_pg_alpha, + sic_pg_num, ) # Running short form output @@ -110,7 +110,6 @@ def run_outputs( run_id, ultfoc_mapper, postcode_mapper, - sic_pg_num, ) OutputMainLogger.info("Finished TAU output.") @@ -124,7 +123,6 @@ def run_outputs( run_id, ultfoc_mapper, postcode_mapper, - sic_pg_num, ) OutputMainLogger.info("Finished GB SAS output.") @@ -136,8 +134,6 @@ def run_outputs( config, write_csv, run_id, - sic_pg_num, - postcode_mapper, ) OutputMainLogger.info("Finished NI SAS output.") diff --git a/src/outputs/tau.py b/src/outputs/tau.py index e06c70a83..02e7ed11b 100644 --- a/src/outputs/tau.py +++ b/src/outputs/tau.py @@ -6,7 +6,6 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df -from src.staging.pg_conversion import sic_to_pg_mapper OutputMainLogger = logging.getLogger(__name__) @@ -18,7 +17,6 @@ def output_tau( run_id: int, ultfoc_mapper: pd.DataFrame, postcode_itl_mapper: pd.DataFrame, - sic_pg_num: pd.DataFrame, ): """Run the outputs module. @@ -30,8 +28,6 @@ def output_tau( run_id (int): The current run id ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame. postcode_itl_mapper (pd.DataFrame): maps the postcode to region code - pg_alpha_num (pd.DataFrame): mapper of alpha PG to numeric PG - """ NETWORK_OR_HDFS = config["global"]["network_or_hdfs"] @@ -49,16 +45,6 @@ def output_tau( # Join foriegn ownership column using ultfoc mapper df = map_o.join_fgn_ownership(df, ultfoc_mapper, formtype=["0001", "0006"]) - # Fill in numeric PG for short forms and imputed long forms - df = sic_to_pg_mapper( - df, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0006", "0001", "0003"], - ) - # Map to the CORA statuses from the statusencoded column df = map_o.create_cora_status_col(df) diff --git a/src/pipeline.py b/src/pipeline.py index 81ded7174..83f9cccad 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -137,7 +137,8 @@ def run_pipeline(start, config_path): imputed_df = run_imputation( full_responses, manual_trimming_df, - sic_pg_alpha, + pg_num_alpha, + sic_pg_num, backdata, config, write_csv, @@ -196,7 +197,7 @@ def run_pipeline(start, config_path): civil_defence_detailed, sic_division_detailed, pg_num_alpha, - sic_pg_alpha, + sic_pg_num, ) MainLogger.info("Finished All Output modules.") diff --git a/src/staging/pg_conversion.py b/src/staging/pg_conversion.py deleted file mode 100644 index c6fc43aee..000000000 --- a/src/staging/pg_conversion.py +++ /dev/null @@ -1,173 +0,0 @@ -import pandas as pd -import logging -import numpy as np - -PgLogger = logging.getLogger(__name__) - - -def pg_to_pg_mapper( - df: pd.DataFrame, - mapper: pd.DataFrame, - target_col: str = "product_group", - pg_column: str = "201", - from_col: str = "pg_numeric", - to_col: str = "pg_alpha", -): - """This function maps all values in one column to another column - using a mapper file. This is applied to long forms only. - The default this is used for is PG numeric to letter conversion. - - Args: - df (pd.DataFrame): The dataset containing all the PG numbers - mapper (pd.DataFrame): The mapper dataframe loaded using custom function - target_col (str, optional): The column we output the - mapped values to (product_group). - pg_column (str, optional): The column we want to convert (201). - from_col (str, optional): The column in the mapper that is used to map from. - to_col (str, optional): The column in the mapper that is used to map to. - - Returns: - pd.DataFrame: A dataframe with all target column values mapped - """ - - filtered_df = df.copy() - - if "formtype" in filtered_df.columns: - formtype_cond = filtered_df["formtype"] == "0001" - filtered_df = filtered_df[formtype_cond] - - # Create a mapping dictionary from the 2 columns - map_dict = dict(zip(mapper[from_col], mapper[to_col])) - # Flag all PGs that don't have a corresponding map value - mapless_errors = [] - for key, value in map_dict.items(): - if str(value) == "nan": - mapless_errors.append(key) - - if mapless_errors: - PgLogger.error( - f"Mapping doesnt exist for the following product groups: {mapless_errors}" - ) - # Map using the dictionary taking into account the null values. - # Then convert to categorigal datatype - filtered_df[pg_column] = pd.to_numeric(filtered_df[pg_column], errors="coerce") - filtered_df[target_col] = filtered_df[pg_column].map(map_dict) - filtered_df[target_col] = filtered_df[target_col].astype("category") - - df.loc[ - filtered_df.index, - f"{target_col}", - ] = filtered_df[target_col] - - PgLogger.info("Product groups successfully mapped to letters") - - return df - - -def sic_to_pg_mapper( - df: pd.DataFrame, - sicmapper: pd.DataFrame, - target_col: str = "product_group", - sic_column: str = "rusic", - from_col: str = "sic", - to_col: str = "pg_alpha", - formtype: str = ["0006"], -): - """This function maps all values in one column to another column - using a mapper file. This is only applied for short forms and unsampled - refs. - - The default this is used for is PG numeric to letter conversion. - - Args: - df (pd.DataFrame): The dataset containing all the PG numbers. - sicmapper (pd.DataFrame): The mapper dataframe loaded using custom function. - target_col (str, optional): The column we output the - mapped values to (product_group). - sic_column (str, optional): The column containing the SIC numbers. - from_col (str, optional): The column in the mapper that is used to map from. - to_col (str, optional): The column in the mapper that is used to map to. - - Returns: - pd.DataFrame: A dataframe with all target column values mapped - """ - - filtered_df = df.copy() - - filtered_df = filtered_df[filtered_df["formtype"].isin(formtype)] - - if "pg_numeric" in filtered_df.columns: - filtered_df = filtered_df[filtered_df["pg_numeric"].isnull()] - - # Create a mapping dictionary from the 2 columns - map_dict = dict(zip(sicmapper[from_col], sicmapper[to_col])) - # Flag all SIC numbers that don't have a corresponding map value - mapless_errors = [] - for key, value in map_dict.items(): - if str(value) == "nan": - mapless_errors.append(key) - - if mapless_errors: - PgLogger.error( - f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}" - ) - # Map to the target column using the dictionary taking into account the null values. - # Then convert to categorigal datatype - filtered_df[sic_column] = pd.to_numeric(filtered_df[sic_column], errors="coerce") - filtered_df[target_col] = filtered_df[sic_column].map(map_dict) - filtered_df[target_col] = filtered_df[target_col].astype("category") - - df = df.copy() - - df.loc[ - filtered_df.index, - f"{target_col}", - ] = filtered_df[target_col] - - PgLogger.info("SIC numbers successfully mapped to PG letters") - - return df - - -def run_pg_conversion( - df: pd.DataFrame, - pg_num_alpha: pd.DataFrame, - sic_pg_alpha: pd.DataFrame, - target_col: str = "201", -): - """Run the product group mapping functions and return a - dataframe with the correct mapping for each formtype. - - Args: - df (pd.DataFrame): Dataframe of full responses data - mapper (pd.DataFrame): The mapper file used for PG conversion - target_col (str, optional): The column to be created - which stores mapped values. - - Returns: - (pd.DataFrame): Dataframe with mapped values - """ - - df["pg_numeric"] = df["201"].copy() - - if target_col == "201": - target_col = "201_mapping" - else: - # Create a new column to store PGs - df[target_col] = np.nan - - # SIC mapping for short forms - df = sic_to_pg_mapper(df, sic_pg_alpha, target_col=target_col) - - # SIC mapping for NI - df = sic_to_pg_mapper(df, sic_pg_alpha, target_col=target_col, formtype=["0003"]) - - # PG mapping for long forms - df = pg_to_pg_mapper(df, pg_num_alpha, target_col=target_col) - - # Overwrite the 201 column if target_col = 201 - if target_col == "201_mapping": - df["201"] = df[target_col] - df = df.drop(columns=[target_col]) - - return df diff --git a/src/staging/staging_main.py b/src/staging/staging_main.py index 8da3cbffd..383c18d14 100644 --- a/src/staging/staging_main.py +++ b/src/staging/staging_main.py @@ -8,7 +8,6 @@ # Our own modules from src.staging import validation as val -from src.staging import pg_conversion as pg import src.staging.staging_helpers as helpers @@ -210,16 +209,6 @@ def run_staging( # backdata_path, "./config/backdata_schema.toml" # ) - # Fix for different column names on network vs hdfs - if network_or_hdfs == "network": - # Map PG numeric to alpha in column q201 - # This isn't done on HDFS as the column is already mapped - backdata = pg.pg_to_pg_mapper( - backdata, - pg_num_alpha, - target_col="q201", - pg_column="q201", - ) StagingMainLogger.info("Backdata File Loaded Successfully...") else: backdata = None @@ -287,11 +276,6 @@ def run_staging( mapper_path = paths["mapper_path"] write_csv(f"{mapper_path}/sic_pg_num.csv", sic_pg_utf_mapper) - # Map PG from SIC/PG numbers to column '201'. - full_responses = pg.run_pg_conversion( - full_responses, pg_num_alpha, sic_pg_alpha_mapper, target_col="201" - ) - pg_detailed_mapper = helpers.load_valdiate_mapper( "pg_detailed_mapper_path", paths, diff --git a/tests/test_staging/test_pg_conversion.py b/tests/test_staging/test_pg_conversion.py index a77c2b9f5..d39418fd7 100644 --- a/tests/test_staging/test_pg_conversion.py +++ b/tests/test_staging/test_pg_conversion.py @@ -4,76 +4,140 @@ import pytest import numpy as np -from src.staging.pg_conversion import pg_to_pg_mapper, sic_to_pg_mapper +from src.imputation.pg_conversion import pg_to_pg_mapper, sic_to_pg_mapper @pytest.fixture -def dummy_data() -> pd.DataFrame: +def sic_dummy_data() -> pd.DataFrame: # Set up the dummyinput data - data = pd.DataFrame( - {"201": [0, 1, 2, 3, 4], "formtype": ["0001", "0001", "0001", "0001", "0001"]} - ) - return data + columns = ["201", "rusic"] + data = [ + [53, 2500], + [np.nan, 1600], + [np.nan, 4300], + ] + + return pd.DataFrame(data, columns=columns) @pytest.fixture -def mapper() -> pd.DataFrame: - # Set up the dummy mapper data - mapper = { - "pg_numeric": [0, 1, 2, 3, 4], - "pg_alpha": [np.nan, "A", "B", "C", "C"], - } - return pd.DataFrame(mapper) +def sic_mapper(): + columns = ["sic", "pg"] + mapper_rows = [ + [1600, 36], + [2500, 95], + [7300, 45], + [2500, 53], + ] + + # Create the DataFrame + return pd.DataFrame(mapper_rows, columns=columns) @pytest.fixture -def expected_output() -> pd.DataFrame: +def sic_expected_output() -> pd.DataFrame: # Set up the dummy output data - expected_output = pd.DataFrame( - { - "201": [np.nan, "A", "B", "C", "C"], - "formtype": ["0001", "0001", "0001", "0001", "0001"], - } - ) + columns = ["201", "rusic"] + data = [ + [53, 2500], + [36, 1600], + [np.nan, 4300], + ] - expected_output["201"] = expected_output["201"].astype("category") - return expected_output + return pd.DataFrame(data, columns=columns) -@pytest.fixture -def sic_dummy_data() -> pd.DataFrame: - # Set up the dummyinput data - data = pd.DataFrame( - {"rusic": [1110, 10101], "201": [np.nan, np.nan], "formtype": ["0006", "0006"]} - ) - return data +def test_sic_mapper(sic_dummy_data, sic_expected_output, sic_mapper): + """Tests for pg mapper function.""" + expected_output_data = sic_expected_output -@pytest.fixture -def sic_mapper() -> pd.DataFrame: - # Set up the dummy mapper data - mapper = { - "sic": [1110, 10101], - "pg_alpha": ["A", "B"], - } - return pd.DataFrame(mapper) + df_result = sic_to_pg_mapper( + sic_dummy_data, + sic_mapper, + pg_column="201", + from_col="sic", + to_col="pg", + ) + + pd.testing.assert_frame_equal(df_result, expected_output_data) @pytest.fixture -def sic_expected_output() -> pd.DataFrame: - # Set up the dummy output data - expected_output = pd.DataFrame( - {"rusic": [1110, 10101], "201": ["A", "B"], "formtype": ["0006", "0006"]} - ) - expected_output["201"] = expected_output["201"].astype("category") - return expected_output +def mapper(): + mapper_rows = [ + [36, "N"], + [37, "Y"], + [45, "AC"], + [47, "AD"], + [49, "AD"], + [50, "AD"], + [58, "AH"], + ] + columns = ["pg_numeric", "pg_alpha"] + # Create the DataFrame + mapper_df = pd.DataFrame(mapper_rows, columns=columns) -def test_sic_mapper(sic_dummy_data, sic_expected_output, sic_mapper): - """Tests for pg mapper function.""" + # Return the DataFrame + return mapper_df - expected_output_data = sic_expected_output - df_result = sic_to_pg_mapper(sic_dummy_data, sic_mapper, target_col="201") +def test_pg_to_pg_mapper_with_many_to_one(mapper): - pd.testing.assert_frame_equal(df_result, expected_output_data) + columns = ["formtype", "201", "other_col"] + row_data = [ + ["0001", 45, "2020"], + ["0001", 49, "2020"], + ["0002", 50, "2020"] + ] + + test_df = pd.DataFrame(row_data, columns=columns) + + expected_columns = ["formtype", "201", "other_col", "pg_numeric"] + + expected_data = [ + ["0001", "AC", "2020", 45], + ["0001", "AD", "2020", 49], + ["0002", "AD", "2020", 50] + ] + + type_dict = {"201": "category", "pg_numeric": "category"} + + # Build the expected result dataframe. Set the dtype of prod group to cat, like the result_df + expected_result_df = pd.DataFrame(expected_data, columns=expected_columns) + expected_result_df = expected_result_df.astype(type_dict) + + result_df = pg_to_pg_mapper(test_df.copy(), mapper.copy()) + + pd.testing.assert_frame_equal(result_df, expected_result_df, check_dtype=False) + + +def test_pg_to_pg_mapper_success(mapper): + columns = ["formtype", "201", "other_col"] + row_data = [ + ["0001", 36, "2020"], + ["0001", 45, "2020"], + ["0002", 58, "2020"], + ["0001", 49, "2020"], + ] + + test_df = pd.DataFrame(row_data, columns=columns) + + expected_columns = ["formtype", "201", "other_col", "pg_numeric"] + expected_data = [ + ["0001", "N", "2020", 36], + ["0001", "AC", "2020", 45], + ["0002", "AH", "2020", 58], + ["0001", "AD", "2020", 49], + ] + + expected_result_df = pd.DataFrame( + expected_data, columns=expected_columns) + + type_dict = {"201": "category", "pg_numeric": "category"} + expected_result_df = expected_result_df.astype(type_dict) + + result_df = pg_to_pg_mapper(test_df.copy(), mapper.copy()) + + pd.testing.assert_frame_equal(result_df, expected_result_df)