From 7fdcfa08dd87844b3235e75bbe211724b1a6c53e Mon Sep 17 00:00:00 2001 From: Griffith Date: Thu, 11 Jan 2024 17:47:45 +0000 Subject: [PATCH 1/6] update tests --- src/outputs/form_output_prep.py | 8 +- src/outputs/gb_sas.py | 15 --- src/outputs/outputs_main.py | 2 - src/outputs/tau.py | 13 -- src/staging/pg_conversion.py | 152 ++++++++++----------- src/staging/staging_main.py | 1 - tests/test_staging/test_pg_conversion.py | 160 ++++++++++++++++------- 7 files changed, 187 insertions(+), 164 deletions(-) diff --git a/src/outputs/form_output_prep.py b/src/outputs/form_output_prep.py index 0e3898484..7b2401562 100644 --- a/src/outputs/form_output_prep.py +++ b/src/outputs/form_output_prep.py @@ -1,5 +1,5 @@ import pandas as pd -from src.staging.pg_conversion import run_pg_conversion +from src.staging.pg_conversion import sic_to_pg_mapper from src.staging.validation import flag_no_rand_spenders @@ -59,9 +59,9 @@ def form_output_prep( ni_full_responses["form_status"] = 600 ni_full_responses["602"] = 100 ni_full_responses["formtype"] = "0003" - ni_full_responses = run_pg_conversion( - ni_full_responses, pg_num_alpha, sic_pg_alpha, target_col="201" - ) + + # Update column 201 (currently PG numeric) to alpha-numeric, mapping from SIC. + ni_full_responses = sic_to_pg_mapper(ni_full_responses, sic_pg_alpha) # outputs_df = pd.concat([outputs_df, ni_full_responses]) tau_outputs_df = pd.concat([tau_outputs_df, ni_full_responses]) diff --git a/src/outputs/gb_sas.py b/src/outputs/gb_sas.py index 4435a465f..73ffaad8c 100644 --- a/src/outputs/gb_sas.py +++ b/src/outputs/gb_sas.py @@ -7,7 +7,6 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df, regions -from src.staging.pg_conversion import sic_to_pg_mapper GbSasLogger = logging.getLogger(__name__) @@ -19,7 +18,6 @@ def output_gb_sas( run_id: int, ultfoc_mapper: pd.DataFrame, postcode_mapper: pd.DataFrame, - sic_pg_num: pd.DataFrame, ): """Run the outputs module. @@ -31,8 +29,6 @@ def output_gb_sas( run_id (int): The current run id ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame. postcode_mapper (pd.DataFrame): maps the postcode to region code - pg_alpha_num (pd.DataFrame): mapper of numeric PG to alpha PG - """ NETWORK_OR_HDFS = config["global"]["network_or_hdfs"] @@ -47,20 +43,9 @@ def output_gb_sas( # Join foriegn ownership column using ultfoc mapper df1 = map_o.join_fgn_ownership(df1, ultfoc_mapper) - # Fill in numeric PG for short forms and imputed long forms - df1 = sic_to_pg_mapper( - df1, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0006", "0001"], - ) - # Map to the CORA statuses from the statusencoded column df1 = map_o.create_cora_status_col(df1) - # Map the sizebands based on frozen employment df1 = map_o.map_sizebands(df1) diff --git a/src/outputs/outputs_main.py b/src/outputs/outputs_main.py index c61280772..161c0be35 100644 --- a/src/outputs/outputs_main.py +++ b/src/outputs/outputs_main.py @@ -123,7 +123,6 @@ def run_outputs( run_id, ultfoc_mapper, postcode_mapper, - sic_pg_num, ) OutputMainLogger.info("Finished TAU output.") @@ -137,7 +136,6 @@ def run_outputs( run_id, ultfoc_mapper, postcode_mapper, - sic_pg_num, ) OutputMainLogger.info("Finished GB SAS output.") diff --git a/src/outputs/tau.py b/src/outputs/tau.py index e06c70a83..329ab32d3 100644 --- a/src/outputs/tau.py +++ b/src/outputs/tau.py @@ -18,7 +18,6 @@ def output_tau( run_id: int, ultfoc_mapper: pd.DataFrame, postcode_itl_mapper: pd.DataFrame, - sic_pg_num: pd.DataFrame, ): """Run the outputs module. @@ -30,8 +29,6 @@ def output_tau( run_id (int): The current run id ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame. postcode_itl_mapper (pd.DataFrame): maps the postcode to region code - pg_alpha_num (pd.DataFrame): mapper of alpha PG to numeric PG - """ NETWORK_OR_HDFS = config["global"]["network_or_hdfs"] @@ -49,16 +46,6 @@ def output_tau( # Join foriegn ownership column using ultfoc mapper df = map_o.join_fgn_ownership(df, ultfoc_mapper, formtype=["0001", "0006"]) - # Fill in numeric PG for short forms and imputed long forms - df = sic_to_pg_mapper( - df, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0006", "0001", "0003"], - ) - # Map to the CORA statuses from the statusencoded column df = map_o.create_cora_status_col(df) diff --git a/src/staging/pg_conversion.py b/src/staging/pg_conversion.py index c6fc43aee..5fbca575c 100644 --- a/src/staging/pg_conversion.py +++ b/src/staging/pg_conversion.py @@ -5,24 +5,34 @@ PgLogger = logging.getLogger(__name__) -def pg_to_pg_mapper( +def sic_to_pg_mapper( df: pd.DataFrame, - mapper: pd.DataFrame, - target_col: str = "product_group", + sicmapper: pd.DataFrame, pg_column: str = "201", - from_col: str = "pg_numeric", - to_col: str = "pg_alpha", + sic_column: str = "rusic", + from_col: str = "SIC 2007_CODE", + to_col: str = "2016 > Form PG", ): - """This function maps all values in one column to another column - using a mapper file. This is applied to long forms only. - The default this is used for is PG numeric to letter conversion. + """Map from SIC code to PG numeric code where PG numeric is null. + + Example initial dataframe: + reference | 201 | rusic + -------------------------------- + 1 | 53 | 2500 + 2 | NaN | 1600 + 3 | NaN | 4300 + + returned dataframe: + reference | 201 | rusic + -------------------------------- + 1 | 53 | 2500 + 2 | 45 | 1600 + 3 | 38 | 4300 Args: - df (pd.DataFrame): The dataset containing all the PG numbers - mapper (pd.DataFrame): The mapper dataframe loaded using custom function - target_col (str, optional): The column we output the - mapped values to (product_group). - pg_column (str, optional): The column we want to convert (201). + df (pd.DataFrame): The dataset containing all the PG numbers. + sicmapper (pd.DataFrame): The SIC to pg numeric mapper. + sic_column (str, optional): The column containing the SIC numbers. from_col (str, optional): The column in the mapper that is used to map from. to_col (str, optional): The column in the mapper that is used to map to. @@ -30,15 +40,11 @@ def pg_to_pg_mapper( pd.DataFrame: A dataframe with all target column values mapped """ - filtered_df = df.copy() - - if "formtype" in filtered_df.columns: - formtype_cond = filtered_df["formtype"] == "0001" - filtered_df = filtered_df[formtype_cond] + df = df.copy() # Create a mapping dictionary from the 2 columns - map_dict = dict(zip(mapper[from_col], mapper[to_col])) - # Flag all PGs that don't have a corresponding map value + map_dict = dict(zip(sicmapper[from_col], sicmapper[to_col])) + # Flag all SIC numbers that don't have a corresponding map value mapless_errors = [] for key, value in map_dict.items(): if str(value) == "nan": @@ -46,45 +52,49 @@ def pg_to_pg_mapper( if mapless_errors: PgLogger.error( - f"Mapping doesnt exist for the following product groups: {mapless_errors}" + f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}" ) - # Map using the dictionary taking into account the null values. - # Then convert to categorigal datatype - filtered_df[pg_column] = pd.to_numeric(filtered_df[pg_column], errors="coerce") - filtered_df[target_col] = filtered_df[pg_column].map(map_dict) - filtered_df[target_col] = filtered_df[target_col].astype("category") + # Map to the target column using the dictionary, null values only + df.loc[df[pg_column].isnull(), pg_column] = ( + df.loc[df[pg_column].isnull(), sic_column].map(map_dict) + ) - df.loc[ - filtered_df.index, - f"{target_col}", - ] = filtered_df[target_col] - - PgLogger.info("Product groups successfully mapped to letters") + PgLogger.info("Product group nulls successfully mapped from SIC.") return df -def sic_to_pg_mapper( +def pg_to_pg_mapper( df: pd.DataFrame, - sicmapper: pd.DataFrame, - target_col: str = "product_group", - sic_column: str = "rusic", - from_col: str = "sic", + mapper: pd.DataFrame, + pg_column: str = "201", + from_col: str = "pg_numeric", to_col: str = "pg_alpha", - formtype: str = ["0006"], ): - """This function maps all values in one column to another column - using a mapper file. This is only applied for short forms and unsampled - refs. + """Map from PG numeric to PG alpha-numeric and create a new column. + + The product group column (default: column 201) coped to a new column, "pg_numeric", + and then is updated from numeric to alpha-numeric using a mapping. + + Example initial dataframe: + reference | 201 + ---------------------- + 1 | 53 + 2 | 43 + 3 | 33 + + returned dataframe: + reference | 201 | pg_numeric + ------------------------------------ + 1 | AA | 33 + 2 | B | 43 + 3 | E | 53 - The default this is used for is PG numeric to letter conversion. Args: - df (pd.DataFrame): The dataset containing all the PG numbers. - sicmapper (pd.DataFrame): The mapper dataframe loaded using custom function. - target_col (str, optional): The column we output the - mapped values to (product_group). - sic_column (str, optional): The column containing the SIC numbers. + df (pd.DataFrame): The dataframe requiring mapping + mapper (pd.DataFrame): the PG numeric to alpha-numeric mapper + pg_column (str, optional): The column we want to convert (default 201). from_col (str, optional): The column in the mapper that is used to map from. to_col (str, optional): The column in the mapper that is used to map to. @@ -92,16 +102,15 @@ def sic_to_pg_mapper( pd.DataFrame: A dataframe with all target column values mapped """ - filtered_df = df.copy() - - filtered_df = filtered_df[filtered_df["formtype"].isin(formtype)] + df = df.copy() - if "pg_numeric" in filtered_df.columns: - filtered_df = filtered_df[filtered_df["pg_numeric"].isnull()] + # Copy the numeric PG column to a new column + df["pg_numeric"] = df[pg_column].copy() # Create a mapping dictionary from the 2 columns - map_dict = dict(zip(sicmapper[from_col], sicmapper[to_col])) - # Flag all SIC numbers that don't have a corresponding map value + map_dict = dict(zip(mapper[from_col], mapper[to_col])) + + # Flag all PGs that don't have a corresponding map value mapless_errors = [] for key, value in map_dict.items(): if str(value) == "nan": @@ -109,22 +118,15 @@ def sic_to_pg_mapper( if mapless_errors: PgLogger.error( - f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}" + f"Mapping doesnt exist for the following product groups: {mapless_errors}" ) - # Map to the target column using the dictionary taking into account the null values. - # Then convert to categorigal datatype - filtered_df[sic_column] = pd.to_numeric(filtered_df[sic_column], errors="coerce") - filtered_df[target_col] = filtered_df[sic_column].map(map_dict) - filtered_df[target_col] = filtered_df[target_col].astype("category") - df = df.copy() + df[pg_column] = df[pg_column].map(map_dict) - df.loc[ - filtered_df.index, - f"{target_col}", - ] = filtered_df[target_col] + # Then convert the pg column and the new column to categorigal datatypes + df = df.astype({pg_column: "category", "pg_numeric": "category"}) - PgLogger.info("SIC numbers successfully mapped to PG letters") + PgLogger.info("Numeric product groups successfully mapped to letters.") return df @@ -147,22 +149,10 @@ def run_pg_conversion( Returns: (pd.DataFrame): Dataframe with mapped values """ + # Where the + df = sic_to_pg_mapper(df, sic_pg_alpha, ) - df["pg_numeric"] = df["201"].copy() - - if target_col == "201": - target_col = "201_mapping" - else: - # Create a new column to store PGs - df[target_col] = np.nan - - # SIC mapping for short forms - df = sic_to_pg_mapper(df, sic_pg_alpha, target_col=target_col) - - # SIC mapping for NI - df = sic_to_pg_mapper(df, sic_pg_alpha, target_col=target_col, formtype=["0003"]) - - # PG mapping for long forms + # PG numeric to alpha_numeric mapping for long forms df = pg_to_pg_mapper(df, pg_num_alpha, target_col=target_col) # Overwrite the 201 column if target_col = 201 diff --git a/src/staging/staging_main.py b/src/staging/staging_main.py index 8da3cbffd..6d072a475 100644 --- a/src/staging/staging_main.py +++ b/src/staging/staging_main.py @@ -217,7 +217,6 @@ def run_staging( backdata = pg.pg_to_pg_mapper( backdata, pg_num_alpha, - target_col="q201", pg_column="q201", ) StagingMainLogger.info("Backdata File Loaded Successfully...") diff --git a/tests/test_staging/test_pg_conversion.py b/tests/test_staging/test_pg_conversion.py index a77c2b9f5..0fa74af0c 100644 --- a/tests/test_staging/test_pg_conversion.py +++ b/tests/test_staging/test_pg_conversion.py @@ -8,72 +8,136 @@ @pytest.fixture -def dummy_data() -> pd.DataFrame: +def sic_dummy_data() -> pd.DataFrame: # Set up the dummyinput data - data = pd.DataFrame( - {"201": [0, 1, 2, 3, 4], "formtype": ["0001", "0001", "0001", "0001", "0001"]} - ) - return data + columns = ["201", "rusic"] + data = [ + [53, 2500], + [np.nan, 1600], + [np.nan, 4300], + ] + + return pd.DataFrame(data, columns=columns) @pytest.fixture -def mapper() -> pd.DataFrame: - # Set up the dummy mapper data - mapper = { - "pg_numeric": [0, 1, 2, 3, 4], - "pg_alpha": [np.nan, "A", "B", "C", "C"], - } - return pd.DataFrame(mapper) +def sic_mapper(): + columns = ["sic", "pg"] + mapper_rows = [ + [1600, 36], + [2500, 95], + [7300, 45], + [2500, 53], + ] + + # Create the DataFrame + return pd.DataFrame(mapper_rows, columns=columns) @pytest.fixture -def expected_output() -> pd.DataFrame: +def sic_expected_output() -> pd.DataFrame: # Set up the dummy output data - expected_output = pd.DataFrame( - { - "201": [np.nan, "A", "B", "C", "C"], - "formtype": ["0001", "0001", "0001", "0001", "0001"], - } - ) + columns = ["201", "rusic"] + data = [ + [53, 2500], + [36, 1600], + [np.nan, 4300], + ] - expected_output["201"] = expected_output["201"].astype("category") - return expected_output + return pd.DataFrame(data, columns=columns) -@pytest.fixture -def sic_dummy_data() -> pd.DataFrame: - # Set up the dummyinput data - data = pd.DataFrame( - {"rusic": [1110, 10101], "201": [np.nan, np.nan], "formtype": ["0006", "0006"]} - ) - return data +def test_sic_mapper(sic_dummy_data, sic_expected_output, sic_mapper): + """Tests for pg mapper function.""" + expected_output_data = sic_expected_output -@pytest.fixture -def sic_mapper() -> pd.DataFrame: - # Set up the dummy mapper data - mapper = { - "sic": [1110, 10101], - "pg_alpha": ["A", "B"], - } - return pd.DataFrame(mapper) + df_result = sic_to_pg_mapper( + sic_dummy_data, + sic_mapper, + pg_column="201", + from_col="sic", + to_col="pg", + ) + + pd.testing.assert_frame_equal(df_result, expected_output_data) @pytest.fixture -def sic_expected_output() -> pd.DataFrame: - # Set up the dummy output data - expected_output = pd.DataFrame( - {"rusic": [1110, 10101], "201": ["A", "B"], "formtype": ["0006", "0006"]} - ) - expected_output["201"] = expected_output["201"].astype("category") - return expected_output +def mapper(): + mapper_rows = [ + [36, "N"], + [37, "Y"], + [45, "AC"], + [47, "AD"], + [49, "AD"], + [50, "AD"], + [58, "AH"], + ] + columns = ["pg_numeric", "pg_alpha"] + # Create the DataFrame + mapper_df = pd.DataFrame(mapper_rows, columns=columns) -def test_sic_mapper(sic_dummy_data, sic_expected_output, sic_mapper): - """Tests for pg mapper function.""" + # Return the DataFrame + return mapper_df - expected_output_data = sic_expected_output - df_result = sic_to_pg_mapper(sic_dummy_data, sic_mapper, target_col="201") +def test_pg_to_pg_mapper_with_many_to_one(mapper): - pd.testing.assert_frame_equal(df_result, expected_output_data) + columns = ["formtype", "201", "other_col"] + row_data = [ + ["0001", 45, "2020"], + ["0001", 49, "2020"], + ["0002", 50, "2020"] + ] + + test_df = pd.DataFrame(row_data, columns=columns) + + expected_columns = ["formtype", "201", "other_col", "pg_numeric"] + + expected_data = [ + ["0001", "AC", "2020", 45], + ["0001", "AD", "2020", 49], + ["0002", "AD", "2020", 50] + ] + + type_dict = {"201": "category", "pg_numeric": "category"} + + # Build the expected result dataframe. Set the dtype of prod group to cat, like the result_df + expected_result_df = pd.DataFrame(expected_data, columns=expected_columns) + expected_result_df = expected_result_df.astype(type_dict) + + result_df = pg_to_pg_mapper(test_df.copy(), mapper.copy()) + + pd.testing.assert_frame_equal(result_df, expected_result_df, check_dtype=False) + + +def test_pg_to_pg_mapper_success(mapper): + columns = ["formtype", "201", "other_col"] + row_data = [ + ["0001", 36, "2020"], + ["0001", 45, "2020"], + ["0002", 58, "2020"], + ["0001", 49, "2020"], + ] + + test_df = pd.DataFrame(row_data, columns=columns) + + expected_columns = ["formtype", "201", "other_col", "pg_numeric"] + expected_data = [ + ["0001", "N", "2020", 36], + ["0001", "AC", "2020", 45], + ["0002", "AH", "2020", 58], + ["0001", "AD", "2020", 49], + ] + + expected_result_df = pd.DataFrame( + expected_data, columns=expected_columns) + + type_dict = {"201": "category", "pg_numeric": "category"} + expected_result_df = expected_result_df.astype(type_dict) + + result_df = pg_to_pg_mapper(test_df.copy(), mapper.copy()) + + pd.testing.assert_frame_equal(result_df, expected_result_df) From 53bb0944102de8a4fff388a83d5ec4bf6606b92b Mon Sep 17 00:00:00 2001 From: Griffith Date: Thu, 11 Jan 2024 18:38:00 +0000 Subject: [PATCH 2/6] move pg_conversion to imputation --- src/imputation/imputation_main.py | 24 ++++++++++++++++++-- src/{staging => imputation}/pg_conversion.py | 16 ++++--------- src/imputation/tmi_imputation.py | 13 +++-------- src/outputs/form_output_prep.py | 10 +++++--- src/outputs/ni_sas.py | 2 +- src/outputs/outputs_main.py | 4 ++-- src/outputs/tau.py | 1 - src/pipeline.py | 5 ++-- tests/test_staging/test_pg_conversion.py | 2 +- 9 files changed, 44 insertions(+), 33 deletions(-) rename src/{staging => imputation}/pg_conversion.py (91%) diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index a023f982c..34a7172d7 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -7,6 +7,7 @@ from src.imputation import imputation_helpers as hlp from src.imputation import tmi_imputation as tmi from src.staging.validation import load_schema +from src.imputation.pg_conversion import run_pg_conversion, pg_to_pg_mapper from src.imputation.apportionment import run_apportionment from src.imputation.short_to_long import run_short_to_long from src.imputation.MoR import run_mor @@ -21,7 +22,8 @@ def run_imputation( df: pd.DataFrame, manual_trimming_df: pd.DataFrame, - mapper: pd.DataFrame, + pg_num_alpha: pd.DataFrame, + sic_pg_num: pd.DataFrame, backdata: pd.DataFrame, config: Dict[str, Any], write_csv: Callable, @@ -48,6 +50,11 @@ def run_imputation( Returns: pd.DataFrame: dataframe with the imputed columns updated """ + # Carry out product group conversion + df = run_pg_conversion( + df, pg_num_alpha, sic_pg_num, pg_column="201" + ) + # Apportion cols 4xx and 5xx to create FTE and headcount values df = run_apportionment(df) @@ -92,11 +99,24 @@ def run_imputation( # Run MoR if backdata is not None: + # Fix for different column names on network vs hdfs + if NETWORK_OR_HDFS == "network": + # Map PG numeric to alpha in column q201 + # This isn't done on HDFS as the column is already mapped + backdata = pg_to_pg_mapper( + backdata, + pg_num_alpha, + pg_column="q201", + from_col= "pg_numeric", + to_col="pg_alpha", + ) + backdata = backdata.drop("pg_numeric", axis=1) + lf_target_vars = config["imputation"]["lf_target_vars"] df, links_df = run_mor(df, backdata, to_impute_cols, lf_target_vars, config) # Run TMI for long forms and short forms - imputed_df, qa_df = tmi.run_tmi(df, mapper, config) + imputed_df, qa_df = tmi.run_tmi(df, config) # After imputation, correction to ignore the "604" == "No" in any records with # Status "check needed" diff --git a/src/staging/pg_conversion.py b/src/imputation/pg_conversion.py similarity index 91% rename from src/staging/pg_conversion.py rename to src/imputation/pg_conversion.py index 5fbca575c..4649096a9 100644 --- a/src/staging/pg_conversion.py +++ b/src/imputation/pg_conversion.py @@ -134,8 +134,8 @@ def pg_to_pg_mapper( def run_pg_conversion( df: pd.DataFrame, pg_num_alpha: pd.DataFrame, - sic_pg_alpha: pd.DataFrame, - target_col: str = "201", + sic_pg_num: pd.DataFrame, + pg_column: str = "201", ): """Run the product group mapping functions and return a dataframe with the correct mapping for each formtype. @@ -143,21 +143,15 @@ def run_pg_conversion( Args: df (pd.DataFrame): Dataframe of full responses data mapper (pd.DataFrame): The mapper file used for PG conversion - target_col (str, optional): The column to be created - which stores mapped values. + pg_column: The original product group column Returns: (pd.DataFrame): Dataframe with mapped values """ # Where the - df = sic_to_pg_mapper(df, sic_pg_alpha, ) + df = sic_to_pg_mapper(df, sic_pg_num, pg_column) # PG numeric to alpha_numeric mapping for long forms - df = pg_to_pg_mapper(df, pg_num_alpha, target_col=target_col) - - # Overwrite the 201 column if target_col = 201 - if target_col == "201_mapping": - df["201"] = df[target_col] - df = df.drop(columns=[target_col]) + df = pg_to_pg_mapper(df, pg_num_alpha, pg_column) return df diff --git a/src/imputation/tmi_imputation.py b/src/imputation/tmi_imputation.py index ecd170875..c3ea7eaff 100644 --- a/src/imputation/tmi_imputation.py +++ b/src/imputation/tmi_imputation.py @@ -3,7 +3,7 @@ import numpy as np from typing import Dict, List, Tuple, Any -from src.staging.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import sic_to_pg_mapper from src.imputation.impute_civ_def import impute_civil_defence from src.imputation import expansion_imputation as ximp @@ -425,7 +425,6 @@ def calculate_totals(df): def run_longform_tmi( longform_df: pd.DataFrame, - sic_mapper: pd.DataFrame, config: Dict[str, Any], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Function to run imputation end to end and returns the final @@ -434,7 +433,6 @@ def run_longform_tmi( Args: longform_df (pd.DataFrame): the dataset filtered for long form entries target_variables (list): key variables - sic_mapper (pd.DataFrame): dataframe with sic mapper info config (Dict): the configuration settings Returns: final_df: dataframe with the imputed valued added @@ -442,10 +440,7 @@ def run_longform_tmi( qa_df: qa dataframe """ TMILogger.info("Starting TMI long form imputation.") - - # TMI Step 1: impute the Product Group - df = impute_pg_by_sic(longform_df, sic_mapper) - + df = longform_df.copy() # TMI Step 2: impute for R&D type (civil or defence) df = impute_civil_defence(df) @@ -520,7 +515,6 @@ def run_shortform_tmi( def run_tmi( full_df: pd.DataFrame, - sic_mapper: pd.DataFrame, config: Dict[str, Any], ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Function to run imputation end to end and returns the final @@ -528,7 +522,6 @@ def run_tmi( dataframe back to the pipeline Args: full_df (pd.DataFrame): the full responses spp dataframe - sic_mapper (pd.DataFrame): dataframe with sic to product group mapper info config (Dict): the configuration settings Returns: final_df(pd.DataFrame): dataframe with the imputed valued added and counts columns @@ -553,7 +546,7 @@ def run_tmi( excluded_df = full_df.copy().loc[mor_mask] # apply TMI imputation to long forms and then short forms - longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, sic_mapper, config) + longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, config) shortform_tmi_df, qa_df_short = run_shortform_tmi(shortform_df, config) diff --git a/src/outputs/form_output_prep.py b/src/outputs/form_output_prep.py index 7b2401562..4ac885b41 100644 --- a/src/outputs/form_output_prep.py +++ b/src/outputs/form_output_prep.py @@ -1,5 +1,5 @@ import pandas as pd -from src.staging.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import run_pg_conversion from src.staging.validation import flag_no_rand_spenders @@ -8,7 +8,7 @@ def form_output_prep( weighted_df: pd.DataFrame, ni_full_responses: pd.DataFrame, pg_num_alpha: pd.DataFrame, - sic_pg_alpha: pd.DataFrame, + sic_pg_num: pd.DataFrame, ): """Prepares the data for the outputs. @@ -61,7 +61,11 @@ def form_output_prep( ni_full_responses["formtype"] = "0003" # Update column 201 (currently PG numeric) to alpha-numeric, mapping from SIC. - ni_full_responses = sic_to_pg_mapper(ni_full_responses, sic_pg_alpha) + ni_full_responses = run_pg_conversion( + ni_full_responses, + pg_num_alpha, + sic_pg_num + ) # outputs_df = pd.concat([outputs_df, ni_full_responses]) tau_outputs_df = pd.concat([tau_outputs_df, ni_full_responses]) diff --git a/src/outputs/ni_sas.py b/src/outputs/ni_sas.py index b9ea85285..538dcf9f7 100644 --- a/src/outputs/ni_sas.py +++ b/src/outputs/ni_sas.py @@ -6,7 +6,7 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df -from src.staging.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import sic_to_pg_mapper OutputMainLogger = logging.getLogger(__name__) diff --git a/src/outputs/outputs_main.py b/src/outputs/outputs_main.py index 161c0be35..1de77450b 100644 --- a/src/outputs/outputs_main.py +++ b/src/outputs/outputs_main.py @@ -58,7 +58,7 @@ def run_outputs( civil_defence_detailed (pd.DataFrame): Detailed descriptons of civil/defence sic_division_detailed (pd.DataFrame): Detailed descriptons of SIC divisions pg_num_alpha (pd.DataFrame): Mapper for product group conversions (num to alpha) - sic_pg_alpha (pd.DataFrame): Mapper for product group conversions (SIC to alpha) + sic_pg_num (pd.DataFrame): Mapper for product group conversions """ ( @@ -71,7 +71,7 @@ def run_outputs( weighted_df, ni_full_responses, pg_num_alpha, - sic_pg_alpha, + sic_pg_num, ) # Running status filtered full dataframe output for QA diff --git a/src/outputs/tau.py b/src/outputs/tau.py index 329ab32d3..02e7ed11b 100644 --- a/src/outputs/tau.py +++ b/src/outputs/tau.py @@ -6,7 +6,6 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df -from src.staging.pg_conversion import sic_to_pg_mapper OutputMainLogger = logging.getLogger(__name__) diff --git a/src/pipeline.py b/src/pipeline.py index 81ded7174..83f9cccad 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -137,7 +137,8 @@ def run_pipeline(start, config_path): imputed_df = run_imputation( full_responses, manual_trimming_df, - sic_pg_alpha, + pg_num_alpha, + sic_pg_num, backdata, config, write_csv, @@ -196,7 +197,7 @@ def run_pipeline(start, config_path): civil_defence_detailed, sic_division_detailed, pg_num_alpha, - sic_pg_alpha, + sic_pg_num, ) MainLogger.info("Finished All Output modules.") diff --git a/tests/test_staging/test_pg_conversion.py b/tests/test_staging/test_pg_conversion.py index 0fa74af0c..d39418fd7 100644 --- a/tests/test_staging/test_pg_conversion.py +++ b/tests/test_staging/test_pg_conversion.py @@ -4,7 +4,7 @@ import pytest import numpy as np -from src.staging.pg_conversion import pg_to_pg_mapper, sic_to_pg_mapper +from src.imputation.pg_conversion import pg_to_pg_mapper, sic_to_pg_mapper @pytest.fixture From 8b0176accc500d0174923aa2face4ffb90eb350e Mon Sep 17 00:00:00 2001 From: Griffith Date: Mon, 15 Jan 2024 10:09:19 +0000 Subject: [PATCH 3/6] 648 minor changes --- src/imputation/pg_conversion.py | 15 ++++++++------- src/staging/staging_main.py | 15 --------------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/src/imputation/pg_conversion.py b/src/imputation/pg_conversion.py index 4649096a9..76bcf3dd1 100644 --- a/src/imputation/pg_conversion.py +++ b/src/imputation/pg_conversion.py @@ -73,8 +73,10 @@ def pg_to_pg_mapper( ): """Map from PG numeric to PG alpha-numeric and create a new column. - The product group column (default: column 201) coped to a new column, "pg_numeric", - and then is updated from numeric to alpha-numeric using a mapping. + The mapper used is from a file named pg_num_alpha.csv + + The product group column (default: column 201) is copied to a new column, + "pg_numeric", and then the original column is mapped from numeric to alpha-numeric. Example initial dataframe: reference | 201 @@ -137,18 +139,17 @@ def run_pg_conversion( sic_pg_num: pd.DataFrame, pg_column: str = "201", ): - """Run the product group mapping functions and return a - dataframe with the correct mapping for each formtype. + """Run the product group (PG) mapping functions. Args: df (pd.DataFrame): Dataframe of full responses data - mapper (pd.DataFrame): The mapper file used for PG conversion - pg_column: The original product group column + pg_num_alpha (pd.DataFrame): Mapper from numeric to alpha-numeric PG. + pg_column: The original product group column, default 201 Returns: (pd.DataFrame): Dataframe with mapped values """ - # Where the + # Where product group is null, map it from SIC. df = sic_to_pg_mapper(df, sic_pg_num, pg_column) # PG numeric to alpha_numeric mapping for long forms diff --git a/src/staging/staging_main.py b/src/staging/staging_main.py index 6d072a475..383c18d14 100644 --- a/src/staging/staging_main.py +++ b/src/staging/staging_main.py @@ -8,7 +8,6 @@ # Our own modules from src.staging import validation as val -from src.staging import pg_conversion as pg import src.staging.staging_helpers as helpers @@ -210,15 +209,6 @@ def run_staging( # backdata_path, "./config/backdata_schema.toml" # ) - # Fix for different column names on network vs hdfs - if network_or_hdfs == "network": - # Map PG numeric to alpha in column q201 - # This isn't done on HDFS as the column is already mapped - backdata = pg.pg_to_pg_mapper( - backdata, - pg_num_alpha, - pg_column="q201", - ) StagingMainLogger.info("Backdata File Loaded Successfully...") else: backdata = None @@ -286,11 +276,6 @@ def run_staging( mapper_path = paths["mapper_path"] write_csv(f"{mapper_path}/sic_pg_num.csv", sic_pg_utf_mapper) - # Map PG from SIC/PG numbers to column '201'. - full_responses = pg.run_pg_conversion( - full_responses, pg_num_alpha, sic_pg_alpha_mapper, target_col="201" - ) - pg_detailed_mapper = helpers.load_valdiate_mapper( "pg_detailed_mapper_path", paths, From eb637e2ad47cd176db2341e2de405e0fcb94a2c2 Mon Sep 17 00:00:00 2001 From: Griffith Date: Mon, 15 Jan 2024 10:42:33 +0000 Subject: [PATCH 4/6] add exception if mapper not working --- src/imputation/pg_conversion.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/imputation/pg_conversion.py b/src/imputation/pg_conversion.py index 76bcf3dd1..fa6d0556b 100644 --- a/src/imputation/pg_conversion.py +++ b/src/imputation/pg_conversion.py @@ -54,6 +54,8 @@ def sic_to_pg_mapper( PgLogger.error( f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}" ) + raise Exception("Errors in the SIC to PG numeric mapper.") + # Map to the target column using the dictionary, null values only df.loc[df[pg_column].isnull(), pg_column] = ( df.loc[df[pg_column].isnull(), sic_column].map(map_dict) @@ -122,6 +124,7 @@ def pg_to_pg_mapper( PgLogger.error( f"Mapping doesnt exist for the following product groups: {mapless_errors}" ) + raise Exception("Errors in the PG numeric to alpha-numeric mapper.") df[pg_column] = df[pg_column].map(map_dict) From 58e7e578e8f9c51c78c7637c5dbe14a17c6609cf Mon Sep 17 00:00:00 2001 From: Griffith Date: Mon, 15 Jan 2024 10:46:39 +0000 Subject: [PATCH 5/6] remove duplicate line from config --- src/developer_config.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/developer_config.yaml b/src/developer_config.yaml index 7408666a9..a771aade6 100644 --- a/src/developer_config.yaml +++ b/src/developer_config.yaml @@ -27,7 +27,6 @@ global: output_auto_outliers: False output_outlier_qa : False output_estimation_qa: False - output_imputation_qa: False output_apportionment_qa: False output_long_form: False output_short_form: False From 86a91e3fceed8aa9bb6ad00fa46dd97ccb8acc1a Mon Sep 17 00:00:00 2001 From: Griffith Date: Mon, 15 Jan 2024 11:39:45 +0000 Subject: [PATCH 6/6] remove unnecessary pg conversion from NI sas --- src/outputs/ni_sas.py | 16 +--------------- src/outputs/outputs_main.py | 2 -- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/src/outputs/ni_sas.py b/src/outputs/ni_sas.py index 538dcf9f7..717f76854 100644 --- a/src/outputs/ni_sas.py +++ b/src/outputs/ni_sas.py @@ -6,7 +6,7 @@ import src.outputs.map_output_cols as map_o from src.staging.validation import load_schema from src.outputs.outputs_helpers import create_output_df -from src.imputation.pg_conversion import sic_to_pg_mapper +from src.imputation.pg_conversion import run_pg_conversion OutputMainLogger = logging.getLogger(__name__) @@ -16,8 +16,6 @@ def output_ni_sas( config: Dict[str, Any], write_csv: Callable, run_id: int, - sic_pg_num: pd.DataFrame, - postcode_itl_mapper: pd.DataFrame, ): """Run the outputs module. @@ -39,18 +37,6 @@ def output_ni_sas( paths = config[f"{NETWORK_OR_HDFS}_paths"] output_path = paths["output_path"] - # Prepare the columns needed for outputs: - - # Fill in numeric PG where missing - df = sic_to_pg_mapper( - df, - sic_pg_num, - target_col="pg_numeric", - from_col="SIC 2007_CODE", - to_col="2016 > Form PG", - formtype=["0003"], - ) - # Map the sizebands based on frozen employment df = map_o.map_sizebands(df) diff --git a/src/outputs/outputs_main.py b/src/outputs/outputs_main.py index 1de77450b..5bc3556fe 100644 --- a/src/outputs/outputs_main.py +++ b/src/outputs/outputs_main.py @@ -147,8 +147,6 @@ def run_outputs( config, write_csv, run_id, - sic_pg_num, - postcode_mapper, ) OutputMainLogger.info("Finished NI SAS output.")