Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

648 move pg conversion to imputation. #183

Merged
merged 7 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/imputation/imputation_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.imputation import imputation_helpers as hlp
from src.imputation import tmi_imputation as tmi
from src.staging.validation import load_schema
from src.imputation.pg_conversion import run_pg_conversion, pg_to_pg_mapper
from src.imputation.apportionment import run_apportionment
from src.imputation.short_to_long import run_short_to_long
from src.imputation.MoR import run_mor
Expand All @@ -21,7 +22,8 @@
def run_imputation(
df: pd.DataFrame,
manual_trimming_df: pd.DataFrame,
mapper: pd.DataFrame,
pg_num_alpha: pd.DataFrame,
sic_pg_num: pd.DataFrame,
backdata: pd.DataFrame,
config: Dict[str, Any],
write_csv: Callable,
Expand All @@ -48,6 +50,11 @@ def run_imputation(
Returns:
pd.DataFrame: dataframe with the imputed columns updated
"""
# Carry out product group conversion
df = run_pg_conversion(
df, pg_num_alpha, sic_pg_num, pg_column="201"
)

# Apportion cols 4xx and 5xx to create FTE and headcount values
df = run_apportionment(df)

Expand Down Expand Up @@ -92,11 +99,24 @@ def run_imputation(

# Run MoR
if backdata is not None:
# Fix for different column names on network vs hdfs
if NETWORK_OR_HDFS == "network":
# Map PG numeric to alpha in column q201
# This isn't done on HDFS as the column is already mapped
backdata = pg_to_pg_mapper(
backdata,
pg_num_alpha,
pg_column="q201",
from_col= "pg_numeric",
to_col="pg_alpha",
)
backdata = backdata.drop("pg_numeric", axis=1)

lf_target_vars = config["imputation"]["lf_target_vars"]
df, links_df = run_mor(df, backdata, to_impute_cols, lf_target_vars, config)

# Run TMI for long forms and short forms
imputed_df, qa_df = tmi.run_tmi(df, mapper, config)
imputed_df, qa_df = tmi.run_tmi(df, config)

# After imputation, correction to ignore the "604" == "No" in any records with
# Status "check needed"
Expand Down
157 changes: 157 additions & 0 deletions src/imputation/pg_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import pandas as pd
import logging
import numpy as np

PgLogger = logging.getLogger(__name__)


def sic_to_pg_mapper(
df: pd.DataFrame,
sicmapper: pd.DataFrame,
pg_column: str = "201",
sic_column: str = "rusic",
from_col: str = "SIC 2007_CODE",
to_col: str = "2016 > Form PG",
):
"""Map from SIC code to PG numeric code where PG numeric is null.

Example initial dataframe:
reference | 201 | rusic
--------------------------------
1 | 53 | 2500
2 | NaN | 1600
3 | NaN | 4300

returned dataframe:
reference | 201 | rusic
--------------------------------
1 | 53 | 2500
2 | 45 | 1600
3 | 38 | 4300

Args:
df (pd.DataFrame): The dataset containing all the PG numbers.
sicmapper (pd.DataFrame): The SIC to pg numeric mapper.
sic_column (str, optional): The column containing the SIC numbers.
from_col (str, optional): The column in the mapper that is used to map from.
to_col (str, optional): The column in the mapper that is used to map to.

Returns:
pd.DataFrame: A dataframe with all target column values mapped
"""

df = df.copy()

# Create a mapping dictionary from the 2 columns
map_dict = dict(zip(sicmapper[from_col], sicmapper[to_col]))
coatet marked this conversation as resolved.
Show resolved Hide resolved
# Flag all SIC numbers that don't have a corresponding map value
mapless_errors = []
for key, value in map_dict.items():
if str(value) == "nan":
mapless_errors.append(key)

if mapless_errors:
PgLogger.error(
f"Mapping doesnt exist for the following SIC numbers: {mapless_errors}"
)
coatet marked this conversation as resolved.
Show resolved Hide resolved
# Map to the target column using the dictionary, null values only
df.loc[df[pg_column].isnull(), pg_column] = (
df.loc[df[pg_column].isnull(), sic_column].map(map_dict)
)

PgLogger.info("Product group nulls successfully mapped from SIC.")

return df


def pg_to_pg_mapper(
df: pd.DataFrame,
mapper: pd.DataFrame,
pg_column: str = "201",
from_col: str = "pg_numeric",
to_col: str = "pg_alpha",
):
"""Map from PG numeric to PG alpha-numeric and create a new column.

The product group column (default: column 201) coped to a new column, "pg_numeric",
and then is updated from numeric to alpha-numeric using a mapping.
AnneONS marked this conversation as resolved.
Show resolved Hide resolved

Example initial dataframe:
reference | 201
----------------------
1 | 53
2 | 43
3 | 33

returned dataframe:
reference | 201 | pg_numeric
------------------------------------
1 | AA | 33
2 | B | 43
3 | E | 53


Args:
df (pd.DataFrame): The dataframe requiring mapping
mapper (pd.DataFrame): the PG numeric to alpha-numeric mapper
pg_column (str, optional): The column we want to convert (default 201).
from_col (str, optional): The column in the mapper that is used to map from.
to_col (str, optional): The column in the mapper that is used to map to.

Returns:
pd.DataFrame: A dataframe with all target column values mapped
"""

df = df.copy()

# Copy the numeric PG column to a new column
df["pg_numeric"] = df[pg_column].copy()

# Create a mapping dictionary from the 2 columns
map_dict = dict(zip(mapper[from_col], mapper[to_col]))

# Flag all PGs that don't have a corresponding map value
mapless_errors = []
for key, value in map_dict.items():
if str(value) == "nan":
mapless_errors.append(key)

if mapless_errors:
PgLogger.error(
f"Mapping doesnt exist for the following product groups: {mapless_errors}"
)
coatet marked this conversation as resolved.
Show resolved Hide resolved

df[pg_column] = df[pg_column].map(map_dict)

# Then convert the pg column and the new column to categorigal datatypes
df = df.astype({pg_column: "category", "pg_numeric": "category"})

PgLogger.info("Numeric product groups successfully mapped to letters.")

return df


def run_pg_conversion(
df: pd.DataFrame,
pg_num_alpha: pd.DataFrame,
sic_pg_num: pd.DataFrame,
pg_column: str = "201",
):
"""Run the product group mapping functions and return a
dataframe with the correct mapping for each formtype.

Args:
df (pd.DataFrame): Dataframe of full responses data
mapper (pd.DataFrame): The mapper file used for PG conversion
pg_column: The original product group column

Returns:
(pd.DataFrame): Dataframe with mapped values
"""
# Where the
df = sic_to_pg_mapper(df, sic_pg_num, pg_column)

# PG numeric to alpha_numeric mapping for long forms
df = pg_to_pg_mapper(df, pg_num_alpha, pg_column)

return df
13 changes: 3 additions & 10 deletions src/imputation/tmi_imputation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
from typing import Dict, List, Tuple, Any

from src.staging.pg_conversion import sic_to_pg_mapper
from src.imputation.pg_conversion import sic_to_pg_mapper
from src.imputation.impute_civ_def import impute_civil_defence
from src.imputation import expansion_imputation as ximp

Expand Down Expand Up @@ -425,7 +425,6 @@ def calculate_totals(df):

def run_longform_tmi(
longform_df: pd.DataFrame,
sic_mapper: pd.DataFrame,
config: Dict[str, Any],
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Function to run imputation end to end and returns the final
Expand All @@ -434,18 +433,14 @@ def run_longform_tmi(
Args:
longform_df (pd.DataFrame): the dataset filtered for long form entries
target_variables (list): key variables
sic_mapper (pd.DataFrame): dataframe with sic mapper info
config (Dict): the configuration settings
Returns:
final_df: dataframe with the imputed valued added
and counts columns
qa_df: qa dataframe
"""
TMILogger.info("Starting TMI long form imputation.")

# TMI Step 1: impute the Product Group
df = impute_pg_by_sic(longform_df, sic_mapper)

df = longform_df.copy()
# TMI Step 2: impute for R&D type (civil or defence)
df = impute_civil_defence(df)

Expand Down Expand Up @@ -520,15 +515,13 @@ def run_shortform_tmi(

def run_tmi(
full_df: pd.DataFrame,
sic_mapper: pd.DataFrame,
config: Dict[str, Any],
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Function to run imputation end to end and returns the final
dataframe back to the pipeline
dataframe back to the pipeline
Args:
full_df (pd.DataFrame): the full responses spp dataframe
sic_mapper (pd.DataFrame): dataframe with sic to product group mapper info
config (Dict): the configuration settings
Returns:
final_df(pd.DataFrame): dataframe with the imputed valued added and counts columns
Expand All @@ -553,7 +546,7 @@ def run_tmi(
excluded_df = full_df.copy().loc[mor_mask]

# apply TMI imputation to long forms and then short forms
longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, sic_mapper, config)
longform_tmi_df, qa_df_long = run_longform_tmi(longform_df, config)

shortform_tmi_df, qa_df_short = run_shortform_tmi(shortform_df, config)

Expand Down
10 changes: 7 additions & 3 deletions src/outputs/form_output_prep.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pandas as pd
from src.staging.pg_conversion import run_pg_conversion
from src.imputation.pg_conversion import run_pg_conversion
from src.staging.validation import flag_no_rand_spenders


Expand All @@ -8,7 +8,7 @@ def form_output_prep(
weighted_df: pd.DataFrame,
ni_full_responses: pd.DataFrame,
pg_num_alpha: pd.DataFrame,
sic_pg_alpha: pd.DataFrame,
sic_pg_num: pd.DataFrame,
):

"""Prepares the data for the outputs.
Expand Down Expand Up @@ -59,8 +59,12 @@ def form_output_prep(
ni_full_responses["form_status"] = 600
ni_full_responses["602"] = 100
ni_full_responses["formtype"] = "0003"

# Update column 201 (currently PG numeric) to alpha-numeric, mapping from SIC.
ni_full_responses = run_pg_conversion(
ni_full_responses, pg_num_alpha, sic_pg_alpha, target_col="201"
ni_full_responses,
pg_num_alpha,
sic_pg_num
)

# outputs_df = pd.concat([outputs_df, ni_full_responses])
Expand Down
15 changes: 0 additions & 15 deletions src/outputs/gb_sas.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import src.outputs.map_output_cols as map_o
from src.staging.validation import load_schema
from src.outputs.outputs_helpers import create_output_df, regions
from src.staging.pg_conversion import sic_to_pg_mapper

GbSasLogger = logging.getLogger(__name__)

Expand All @@ -19,7 +18,6 @@ def output_gb_sas(
run_id: int,
ultfoc_mapper: pd.DataFrame,
postcode_mapper: pd.DataFrame,
sic_pg_num: pd.DataFrame,
):
"""Run the outputs module.

Expand All @@ -31,8 +29,6 @@ def output_gb_sas(
run_id (int): The current run id
ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame.
postcode_mapper (pd.DataFrame): maps the postcode to region code
pg_alpha_num (pd.DataFrame): mapper of numeric PG to alpha PG

"""

NETWORK_OR_HDFS = config["global"]["network_or_hdfs"]
Expand All @@ -47,20 +43,9 @@ def output_gb_sas(
# Join foriegn ownership column using ultfoc mapper
df1 = map_o.join_fgn_ownership(df1, ultfoc_mapper)

# Fill in numeric PG for short forms and imputed long forms
df1 = sic_to_pg_mapper(
df1,
sic_pg_num,
target_col="pg_numeric",
from_col="SIC 2007_CODE",
to_col="2016 > Form PG",
formtype=["0006", "0001"],
)

# Map to the CORA statuses from the statusencoded column
df1 = map_o.create_cora_status_col(df1)


# Map the sizebands based on frozen employment
df1 = map_o.map_sizebands(df1)

Expand Down
2 changes: 1 addition & 1 deletion src/outputs/ni_sas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import src.outputs.map_output_cols as map_o
from src.staging.validation import load_schema
from src.outputs.outputs_helpers import create_output_df
from src.staging.pg_conversion import sic_to_pg_mapper
from src.imputation.pg_conversion import sic_to_pg_mapper

OutputMainLogger = logging.getLogger(__name__)

Expand Down
6 changes: 2 additions & 4 deletions src/outputs/outputs_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_outputs(
civil_defence_detailed (pd.DataFrame): Detailed descriptons of civil/defence
sic_division_detailed (pd.DataFrame): Detailed descriptons of SIC divisions
pg_num_alpha (pd.DataFrame): Mapper for product group conversions (num to alpha)
sic_pg_alpha (pd.DataFrame): Mapper for product group conversions (SIC to alpha)
sic_pg_num (pd.DataFrame): Mapper for product group conversions
"""

(
Expand All @@ -71,7 +71,7 @@ def run_outputs(
weighted_df,
ni_full_responses,
pg_num_alpha,
sic_pg_alpha,
sic_pg_num,
)

# Running status filtered full dataframe output for QA
Expand Down Expand Up @@ -123,7 +123,6 @@ def run_outputs(
run_id,
ultfoc_mapper,
postcode_mapper,
sic_pg_num,
)
OutputMainLogger.info("Finished TAU output.")

Expand All @@ -137,7 +136,6 @@ def run_outputs(
run_id,
ultfoc_mapper,
postcode_mapper,
sic_pg_num,
)
OutputMainLogger.info("Finished GB SAS output.")

Expand Down
Loading
Loading