diff --git a/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_lite_v1.py b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_lite_v1.py index cacb067..4ce8cbe 100644 --- a/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_lite_v1.py +++ b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_lite_v1.py @@ -24,44 +24,12 @@ import numpy as np pd.options.mode.chained_assignment = None # default='warn' import utils +import hourly_qff_to_cdm_utils as h_utils # Set the file extension for the subdaily psv files EXTENSION = 'qff' # Dictionaries to hold CDM codes. In due course, read directly from those docs -HEIGHTS = { - "temperature" : "2", - "dew_point_temperature" : "2", - "station_level_pressure" : "2", - "sea_level_pressure" : "2", - "wind_direction" : "10", - "wind_speed" : "10", -} -UNITS = { - "temperature" : "5", - "dew_point_temperature" : "5", - "station_level_pressure" : "32", - "sea_level_pressure" : "32", - "wind_direction" : "320", - "wind_speed" : "731", -} -VARIABLE_ID = { - "temperature" : "85", - "dew_point_temperature" : "36", - "station_level_pressure" : "57", - "sea_level_pressure" : "58", - "wind_direction" : "106", - "wind_speed" : "107", -} -MISSING_DATA = { - "temperature" : -99999, - "dew_point_temperature" : -99999, - "station_level_pressure" : -99999, - "sea_level_pressure" : -99999, - "wind_direction" : -999, - "wind_speed" : -999, -} - INITIAL_COLUMNS = ["observation_id","report_type","date_time","date_time_meaning", "latitude","longitude","observation_height_above_station_surface", "observed_variable","units","observation_value", @@ -133,119 +101,6 @@ def construct_report_type(var_frame, all_frame, id_field): return var_frame -def extract_qc_info(var_frame, all_frame, var_name): - """ - Extract QC information for the QC tables - - var_frame : `dataframe` - Dataframe for variable - - all_frame : `dataframe` - Dataframe for station - - var_name : `str` - Name of variable to use to extract QC information - """ - - var_frame["quality_flag"] = all_frame[f"{var_name}_QC_flag"] - var_frame["qc_method"] = var_frame["quality_flag"] - var_frame["report_id"] = var_frame["date_time"] - - # Set quality flag from master dataframe for variable - # and fill all nan with Null then change all nonnan to 1 - var_frame.loc[var_frame['quality_flag'].notnull(), "quality_flag"] = 1 - var_frame = var_frame.fillna("Null") - var_frame.quality_flag[var_frame.quality_flag == "Null"] = 0 - - return var_frame - - -def overwrite_variable_info(var_frame, var_name): - """ - Replace information for variable with CDM codes - - var_frame : `dataframe` - Dataframe for variable - - var_name : `str` - Name of variable - """ - - var_frame["observation_height_above_station_surface"] = HEIGHTS[var_name] - var_frame["units"] = UNITS[var_name] - var_frame["observed_variable"] = VARIABLE_ID[var_name] - - return var_frame - - -def remove_missing_data_rows(var_frame, var_name): - """ - Remove rows with no data - - var_frame : `dataframe` - Dataframe for variable - - var_name : `str` - Name of variable - """ - - var_frame = var_frame.fillna("null") - var_frame = var_frame.replace({"null" : f"{MISSING_DATA[var_name]}"}) - var_frame = var_frame[var_frame.observation_value != MISSING_DATA[var_name]] - var_frame = var_frame.dropna(subset=['secondary_id']) - var_frame = var_frame.dropna(subset=['observation_value']) - var_frame["source_id"] = pd.to_numeric(var_frame["source_id"], errors='coerce') - - return var_frame - - -def add_data_policy(var_frame, policy_frame): - """ - Merge in data policy information from another dataframe - - var_frame : `dataframe` - Dataframe for variable - - policy_frame : `dataframe` - Dataframe for the data policy - """ - - var_frame = var_frame.astype(str) - - # merge policy frame into var_frame - var_frame = policy_frame.merge(var_frame, on=['primary_station_id_2']) - - # rename column and remove ".0" - var_frame['data_policy_licence'] = var_frame['data_policy_licence_x'] - - var_frame['data_policy_licence'] = var_frame['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - - return var_frame - - -def construct_obs_id(var_frame): - """ - construct `observation_id` field - - var_frame : `dataframe` - Dataframe for variable - """ - - # concatenate columns - var_frame['observation_id'] = var_frame['primary_station_id'].astype(str) + "-" + \ - var_frame['record_number'].astype(str) + "-" + \ - var_frame['date_time'].astype(str) - - var_frame['observation_id'] = var_frame['observation_id'].str.replace(r' ', '-') - - # Remove unwanted last two characters - var_frame['observation_id'] = var_frame['observation_id'].str[:-6] - var_frame["observation_id"] = var_frame["observation_id"] + "-" + \ - var_frame['observed_variable'].astype(str) + "-" + \ - var_frame['value_significance'].astype(str) - - return var_frame - def construct_qc_df(var_frame): """ @@ -288,51 +143,6 @@ def construct_qc_df(var_frame): return qc_frame -def fix_decimal_places(var_frame, do_obs_value=True): - """ - Make sure no decimal places remain - or round value to required number of decimal places - - var_frame : `dataframe` - Dataframe for variable - """ - - # remove the decimal places by editing string - var_frame['source_id'] = var_frame['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) - var_frame["source_id"] = pd.to_numeric(var_frame["source_id"], errors='coerce') - - # remove decimal places by editing string - var_frame['data_policy_licence'] = var_frame['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0', '')) - - if do_obs_value: - # Convert to float to allow rounding - var_frame["observation_value"] = pd.to_numeric(var_frame["observation_value"], errors='coerce') - var_frame["observation_value"] = var_frame["observation_value"].round(2) - - return var_frame - - -def construct_extra_ids(var_frame, all_frame, var_name): - """ - Construct source_id and secondary_id fields - - var_frame : `dataframe` - Dataframe for variable - - all_frame : `dataframe` - Dataframe for station - - var_name : `str` - Name of variable to use to extract QC information - """ - - var_frame["source_id"] = all_frame[f"{var_name}_Source_Code"] - var_frame["secondary_id"] = all_frame[f"{var_name}_Source_Station_ID"].astype('str') - var_frame['secondary_id'] = var_frame['secondary_id'].astype(str).apply(lambda x: x.replace('.0', '')) - - return var_frame - - def main(station="", subset="", run_all=False, clobber=False): """ Run processing of hourly QFF to CDM lite & QC tables @@ -474,16 +284,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dft["observation_value"] = df["temperature"] + 273.15 - dft = construct_extra_ids(dft, df, "temperature") + dft = h_utils.construct_extra_ids(dft, df, "temperature") # Extract QC information for QC tables - dft = extract_qc_info(dft, df, "temperature") + dft = h_utils.extract_qc_info(dft, df, "temperature", do_report_id=True) # Change for each variable if required - dft = overwrite_variable_info(dft, "temperature") + dft = h_utils.overwrite_variable_info(dft, "temperature") # Remove unwanted missing data rows - dft = remove_missing_data_rows(dft, "temperature") + dft = h_utils.remove_missing_data_rows(dft, "temperature") # Concatenate columns for joining dataframe in next step dft['source_id'] = dft['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -492,13 +302,13 @@ def main(station="", subset="", run_all=False, clobber=False): dft["observation_value"] = pd.to_numeric(dft["observation_value"], errors='coerce') # Add data policy and record number to dataframe - dft = add_data_policy(dft, data_policy_df) + dft = h_utils.add_data_policy(dft, data_policy_df) # Restrict to required columns dft = dft[INTERMED_COLUMNS] # Create observation_id field - dft = construct_obs_id(dft) + dft = h_utils.construct_obs_id(dft) # Set up QC table qct = construct_qc_df(dft) @@ -507,7 +317,7 @@ def main(station="", subset="", run_all=False, clobber=False): dft = dft[FINAL_COLUMNS] # Ensure correct number of decimal places - dft = fix_decimal_places(dft) + dft = h_utils.fix_decimal_places(dft) # ================================================================================= @@ -519,16 +329,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dfdpt["observation_value"] = df["dew_point_temperature"] + 273.15 - dfdpt = construct_extra_ids(dfdpt, df, "dew_point_temperature") + dfdpt = h_utils.construct_extra_ids(dfdpt, df, "dew_point_temperature") # Extract QC information for QC tables - dfdpt = extract_qc_info(dfdpt, df, "dew_point_temperature") + dfdpt = h_utils.extract_qc_info(dfdpt, df, "dew_point_temperature", do_report_id=True) # Change for each variable if required - dfdpt = overwrite_variable_info(dfdpt, "dew_point_temperature") + dfdpt = h_utils.overwrite_variable_info(dfdpt, "dew_point_temperature") # Remove unwanted mising data rows - dfdpt = remove_missing_data_rows(dfdpt, "dew_point_temperature") + dfdpt = h_utils.remove_missing_data_rows(dfdpt, "dew_point_temperature") # Concatenate columns for joining dataframe for next step dfdpt['source_id'] = dfdpt['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -537,13 +347,13 @@ def main(station="", subset="", run_all=False, clobber=False): dfdpt["observation_value"] = pd.to_numeric(dfdpt["observation_value"],errors='coerce') # Add data policy and record numbers to dataframe - dfdpt = add_data_policy(dfdpt, data_policy_df) + dfdpt = h_utils.add_data_policy(dfdpt, data_policy_df) # Restrict to required columns dfdpt = dfdpt[INTERMED_COLUMNS] # Create observation_id field - dfdpt = construct_obs_id(dfdpt) + dfdpt = h_utils.construct_obs_id(dfdpt) # Set up QC table qcdpt = construct_qc_df(dfdpt) @@ -552,7 +362,7 @@ def main(station="", subset="", run_all=False, clobber=False): dfdpt = dfdpt[FINAL_COLUMNS] # Ensure correct number of decimal places - dfdpt = fix_decimal_places(dfdpt) + dfdpt = h_utils.fix_decimal_places(dfdpt) #==================================================================================== # Convert station level pressure to cdmlite @@ -563,16 +373,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dfslp["observation_value"] = df["station_level_pressure"] - dfslp = construct_extra_ids(dfslp, df, "station_level_pressure") + dfslp = h_utils.construct_extra_ids(dfslp, df, "station_level_pressure") # Extract QC information for QC tables - dfslp = extract_qc_info(dfslp, df, "station_level_pressure") + dfslp = h_utils.extract_qc_info(dfslp, df, "station_level_pressure", do_report_id=True) # Change for each variable if required - dfslp = overwrite_variable_info(dfslp, "station_level_pressure") + dfslp = h_utils.overwrite_variable_info(dfslp, "station_level_pressure") # Remove unwanted missing data rows - dfslp = remove_missing_data_rows(dfslp, "station_level_pressure") + dfslp = h_utils.remove_missing_data_rows(dfslp, "station_level_pressure") # Concatenate columns for joining dataframe for next step dfslp['source_id'] = dfslp['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -580,13 +390,13 @@ def main(station="", subset="", run_all=False, clobber=False): dfslp['source_id'].astype(str) # Add data policy and record numbers to dataframe - dfslp = add_data_policy(dfslp, data_policy_df) + dfslp = h_utils.add_data_policy(dfslp, data_policy_df) # Restrict to required columns dfslp = dfslp[INTERMED_COLUMNS] # Create observation_id field - dfslp = construct_obs_id(dfslp) + dfslp = h_utils.construct_obs_id(dfslp) # Set up QC table qcslp = construct_qc_df(dfslp) @@ -600,7 +410,7 @@ def main(station="", subset="", run_all=False, clobber=False): dfslp['observation_value'] = (dfslp['observation_value']*100) dfslp['observation_value'] = dfslp['observation_value'].map(int) - dfslp = fix_decimal_places(dfslp, do_obs_value=False) + dfslp = h_utils.fix_decimal_places(dfslp, do_obs_value=False) #=========================================================================================== # Convert sea level pressure to CDM lite @@ -611,16 +421,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dfmslp["observation_value"] = df["sea_level_pressure"] - dfmslp = construct_extra_ids(dfmslp, df, "sea_level_pressure") + dfmslp = h_utils.construct_extra_ids(dfmslp, df, "sea_level_pressure") # Extract QC information for QC tables - dfmslp = extract_qc_info(dfmslp, df, "sea_level_pressure") + dfmslp = h_utils.extract_qc_info(dfmslp, df, "sea_level_pressure", do_report_id=True) # Change for each variable if required - dfmslp = overwrite_variable_info(dfmslp, "sea_level_pressure") + dfmslp = h_utils.overwrite_variable_info(dfmslp, "sea_level_pressure") # Remove unwanted missing data rows - dfmslp = remove_missing_data_rows(dfmslp, "sea_level_pressure") + dfmslp = h_utils.remove_missing_data_rows(dfmslp, "sea_level_pressure") # Concatenate columns for joining dataframe for next step dfmslp['source_id'] = dfmslp['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -628,13 +438,13 @@ def main(station="", subset="", run_all=False, clobber=False): dfmslp['source_id'].astype(str) # Add data policy and record numbers to dataframe - dfmslp = add_data_policy(dfmslp, data_policy_df) + dfmslp = h_utils.add_data_policy(dfmslp, data_policy_df) # Restrict to required columns dfmslp = dfmslp[INTERMED_COLUMNS] # Create observation_id field - dfmslp = construct_obs_id(dfmslp) + dfmslp = h_utils.construct_obs_id(dfmslp) # Set up QC table qcmslp = construct_qc_df(dfmslp) @@ -648,7 +458,7 @@ def main(station="", subset="", run_all=False, clobber=False): dfmslp['observation_value'] = (dfmslp['observation_value']*100) dfmslp['observation_value'] = dfmslp['observation_value'].map(int) - dfmslp = fix_decimal_places(dfmslp, do_obs_value=False) + dfmslp = h_utils.fix_decimal_places(dfmslp, do_obs_value=False) #=================================================================================== # Convert wind direction to CDM lite @@ -659,16 +469,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dfwd["observation_value"] = df["wind_direction"] - dfwd = construct_extra_ids(dfwd, df, "wind_direction") + dfwd = h_utils.construct_extra_ids(dfwd, df, "wind_direction") # Extract QC information for QC tables - dfwd = extract_qc_info(dfwd, df, "wind_direction") + dfwd = h_utils.extract_qc_info(dfwd, df, "wind_direction", do_report_id=True) # Change for each variable if required - dfwd = overwrite_variable_info(dfwd, "wind_direction") + dfwd = h_utils.overwrite_variable_info(dfwd, "wind_direction") # Remove unwanted missing data rows - dfwd = remove_missing_data_rows(dfwd, "wind_direction") + dfwd = h_utils.remove_missing_data_rows(dfwd, "wind_direction") # Concatenate columns for joining dataframe for next step dfwd['source_id'] = dfwd['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -676,13 +486,13 @@ def main(station="", subset="", run_all=False, clobber=False): dfwd['source_id'].astype(str) # Add data policy and record numbers to datframe - dfwd = add_data_policy(dfwd, data_policy_df) + dfwd = h_utils.add_data_policy(dfwd, data_policy_df) # Restrict to required columns dfwd = dfwd[INTERMED_COLUMNS] # Create observation_id field - dfwd = construct_obs_id(dfwd) + dfwd = h_utils.construct_obs_id(dfwd) # Set up QC table qcwd = construct_qc_df(dfwd) @@ -692,7 +502,7 @@ def main(station="", subset="", run_all=False, clobber=False): # Make sure no decimal places and round value to reuquired number of decimal places dfwd['observation_value'] = dfwd['observation_value'].astype(str).apply(lambda x: x.replace('.0', '')) - dfwd = fix_decimal_places(dfwd, do_obs_value=False) + dfwd = h_utils.fix_decimal_places(dfwd, do_obs_value=False) #=========================================================================== @@ -704,16 +514,16 @@ def main(station="", subset="", run_all=False, clobber=False): # Change for each variable to convert to CDM compliant values dfws["observation_value"] = df["wind_speed"] - dfws = construct_extra_ids(dfws, df, "wind_speed") + dfws = h_utils.construct_extra_ids(dfws, df, "wind_speed") # Extract QC information for QC tables - dfws = extract_qc_info(dfws, df, "wind_speed") + dfws = h_utils.extract_qc_info(dfws, df, "wind_speed", do_report_id=True) # Change for each variable if required - dfws = overwrite_variable_info(dfws, "wind_speed") + dfws = h_utils.overwrite_variable_info(dfws, "wind_speed") # Remove unwanted missing data rows - dfws = remove_missing_data_rows(dfws, "wind_speed") + dfws = h_utils.remove_missing_data_rows(dfws, "wind_speed") # Concatenate columns for joining dataframe for next step dfws['source_id'] = dfws['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) @@ -723,13 +533,13 @@ def main(station="", subset="", run_all=False, clobber=False): #dft.to_csv("ttest.csv", index=False, sep=",") # Add data policy and record numbers to datafram - dfws = add_data_policy(dfws, data_policy_df) + dfws = h_utils.add_data_policy(dfws, data_policy_df) # Restrict to required columns dfws = dfws[INTERMED_COLUMNS] # Create observation_id field - dfws = construct_obs_id(dfws) + dfws = h_utils.construct_obs_id(dfws) # QC flag tables qcws = construct_qc_df(dfws) @@ -740,7 +550,7 @@ def main(station="", subset="", run_all=False, clobber=False): dfws = dfws[FINAL_COLUMNS] # Ensure correct number of decimal places - dfws = fix_decimal_places(dfws) + dfws = h_utils.fix_decimal_places(dfws) # ================================================================================= diff --git a/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_obs_v1.py b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_obs_v1.py index fdcb92b..1ac9aa3 100644 --- a/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_obs_v1.py +++ b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_obs_v1.py @@ -23,10 +23,74 @@ import pandas as pd pd.options.mode.chained_assignment = None # default='warn' import utils +import hourly_qff_to_cdm_utils as h_utils # Set the file extension for the subdaily psv files EXTENSION = 'qff' + +CONVERSION_FLAGS = { + "temperature" : "0", + "dew_point_temperature" : "0", + "station_level_pressure" : "0", + "sea_level_pressure" : "0", + "wind_direction" : "1", + "wind_speed" : "2", +} +CONVERSION_METHODS = { + "temperature" : "1", + "dew_point_temperature" : "1", + "station_level_pressure" : "7", + "sea_level_pressure" : "7", + "wind_direction" : "", + "wind_speed" : "", +} +NUMERICAL_PRECISION = { + "temperature" : "0.01", + "dew_point_temperature" : "0.01", + "station_level_pressure" : "10", + "sea_level_pressure" : "10", + "wind_direction" : "1", + "wind_speed" : "0.1", +} +ORIGINAL_PRECISION = { + "temperature" : "0.1", + "dew_point_temperature" : "0.1", + "station_level_pressure" : "0.1", + "sea_level_pressure" : "0.1", + "wind_direction" : "1", + "wind_speed" : "0.1", +} +ORIGINAL_UNITS = { + "temperature" : "60", + "dew_point_temperature" : "60", + "station_level_pressure" : "530", + "sea_level_pressure" : "530", + "wind_direction" : "110", + "wind_speed" : "731", +} + +def overwrite_conversion_precision_info(var_frame, var_name): + """ + Replace information for variable with CDM codes + + var_frame : `dataframe` + Dataframe for variable + + var_name : `str` + Name of variable + """ + + var_frame["conversion_flag"] = CONVERSION_FLAGS[var_name] + var_frame["conversion_method"] = CONVERSION_METHODS[var_name] + var_frame["numerical_precision"] = NUMERICAL_PRECISION[var_name] + var_frame["original_precision"] = ORIGINAL_PRECISION[var_name] + var_frame["original_units"] = ORIGINAL_UNITS[var_name] + + return var_frame + + + def main(station="", subset="", run_all=False, clobber=False): """ Run processing of hourly QFF to CDM obs @@ -81,19 +145,33 @@ def main(station="", subset="", run_all=False, clobber=False): return elif all: print(f"All stations run in {utils.SUBDAILY_QFF_IN_DIR}") - all_filenames = [i for i in glob.glob(os.path.join(utils.SUBDAILY_QFF_IN_DIR, f'*.{EXTENSION}'))] + all_filenames = [i for i in glob.glob(os.path.join(utils.SUBDAILY_QFF_IN_DIR, + f'*.{EXTENSION}'))] print(f" N = {len(all_filenames)}") + # Read in the data policy dataframe (only read in if needed) + data_policy_df = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') + data_policy_df = data_policy_df.astype(str) + # To start at begining of files for filename in all_filenames: - print(f"Processing {filename}") + + if not os.path.exists(os.path.join(utils.SUBDAILY_QFF_IN_DIR, filename)): + print("Input QFF file missing: {}".format(os.path.join(utils.SUBDAILY_QFF_IN_DIR, + filename))) + continue + else: + print("Processing {}".format(os.path.join(utils.SUBDAILY_QFF_IN_DIR, filename))) + + # Read in the dataframe df=pd.read_csv(os.path.join(utils.SUBDAILY_QFF_IN_DIR, filename), sep="|",low_memory=False) - # Set up the output filenames, and check if they exist + + # Set up the output filenames, and check if they exist station_id=df.iloc[1]["Station_ID"] # NOTE: this is renamed below to "primary_station_id" outroot_cdmobs = os.path.join(utils.SUBDAILY_CDM_OBS_OUT_DIR, utils.SUBDAILY_CDM_OBS_FILE_ROOT) cdmobs_outfile = f"{outroot_cdmobs}{station_id}.psv" - # if not overwriting + # if not overwriting if not clobber: # and both output files exist if os.path.exists(cdmobs_outfile): @@ -101,672 +179,533 @@ def main(station="", subset="", run_all=False, clobber=False): print(f" {cdmobs_outfile}") print(" Skipping to next station") continue - # to next file in the loop + # to next file in the loop - # set up master df to extrcat each variable - df["report_id"]="" - df["observation_id"]="" - df["data_policy_licence"]="" - df["date_time_meaning"]="1" - df["observation_duration"]="0" - df["latitude"]=df["Latitude"] - df["longitude"]=df["Longitude"] - df["crs"]="" - df["z_coordinate"]="" - df["z_coordinate_type"]="" - df["observation_height_above_station_surface"]="" - df["observed_variable"]="" - df["secondary_variable"]="" - df["observation_value"]="" - df["value_significance"]="12" - df["secondary_value"]="" - df["units"]="" - df["code_table"]="" - df["conversion_flag"]="" - df["location_method"]="" - df["location_precision"]="" - df["z_coordinate_method"]="" - df["bbox_min_longitude"]="" - df["bbox_max_longitude"]="" - df["bbox_min_latitude"]="" - df["bbox_max_latitude"]="" - df["spatial_representativeness"]="" - df["original_code_table"]="" - df["quality_flag"]="" - df["numerical_precision"]="" - df["sensor_id"]="" - df["sensor_automation_status"]="" - df["exposure_of_sensor"]="" - df["original_precision"]="" - df["original_units"]="" - df["original_code_table"]="" - df["original_value"]="" - df["conversion_method"]="" - df["processing_code"]="" - df["processing_level"]="0" - df["adjustment_id"]="" - df["traceability"]="" - df["advanced_qc"]="" - df["advanced_uncertainty"]="" - df["advanced_homogenisation"]="" - df["advanced_assimilation_feedback"]="" - df["secondary_id"]="" - df["source_id"]="" - df["source_record_id"]="" - df["primary_station_id"]=df["Station_ID"] - df["Timestamp2"] = df["Year"].map(str) + "-" + df["Month"].map(str)+ "-" + df["Day"].map(str) - df["Seconds"]="00" - df["offset"]="+00" - df["date_time"] = df["Timestamp2"].map(str)+ " " + df["Hour"].map(str)+":"+df["Minute"].map(str)+":"+df["Seconds"].map(str) - df['date_time'] = pd.to_datetime(df['date_time'], format='%Y/%m/%d' " ""%H:%M") - df['date_time'] = df['date_time'].astype('str') - df.date_time = df.date_time + '+00' + # Set up master df to extract each variable + # Globally set some entries to + df["report_id"] = "" + df["observation_id"] = "" + df["data_policy_licence"] = "" + df["date_time_meaning"] = "1" + df["observation_duration"] = "0" + df["latitude"] = df["Latitude"] + df["longitude"] = df["Longitude"] + df["crs"] = "" + df["z_coordinate"] = "" + df["z_coordinate_type"] = "" + df["observation_height_above_station_surface"] = "" + df["observed_variable"] = "" + df["secondary_variable"] = "" + df["observation_value"] = "" + df["value_significance"] = "12" + df["secondary_value"] = "" + df["units"] = "" + df["code_table"] = "" + df["conversion_flag"] = "" + df["location_method"] = "" + df["location_precision"] = "" + df["z_coordinate_method"] = "" + df["bbox_min_longitude"] = "" + df["bbox_max_longitude"] = "" + df["bbox_min_latitude"] = "" + df["bbox_max_latitude"] = "" + df["spatial_representativeness"] = "" + df["original_code_table"] = "" + df["quality_flag"] = "" + df["numerical_precision"] = "" + df["sensor_id"] = "" + df["sensor_automation_status"] = "" + df["exposure_of_sensor"] = "" + df["original_precision"] = "" + df["original_units"] = "" + df["original_code_table"] = "" + df["original_value"] = "" + df["conversion_method"] = "" + df["processing_code"] = "" + df["processing_level"] = "0" + df["adjustment_id"] = "" + df["traceability"] = "" + df["advanced_qc"] = "" + df["advanced_uncertainty"] = "" + df["advanced_homogenisation"] = "" + df["advanced_assimilation_feedback"] = "" + df["secondary_id"] = "" + df["source_id"] = "" + df["source_record_id"] = "" + df["primary_station_id"] = df["Station_ID"] + df["Timestamp2"] = df["Year"].map(str) + "-" +\ + df["Month"].map(str) + "-" +\ + df["Day"].map(str) + df["Seconds"] = "00" + df["offset"] = "+00" + df["date_time"] = df["Timestamp2"].map(str) + " " + \ + df["Hour"].map(str) + ":" + \ + df["Minute"].map(str) + ":" + \ + df["Seconds"].map(str) + df['date_time'] = pd.to_datetime(df['date_time'], format='%Y/%m/%d' " ""%H:%M") + df['date_time'] = df['date_time'].astype('str') + df.date_time = df.date_time + '+00' + + + # ========================================================================================= + #convert temperature changes for each variable + dft = df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dft["observation_value"] = df["temperature"] + 273.15 + dft["original_value"] = df["temperature"] + + dft = h_utils.construct_extra_ids(dft, df, "temperature") + + dft = overwrite_conversion_precision_info(dft, "temperature") + + # Extract QC information + dft = h_utils.extract_qc_info(dft, df, "temperature") - -#========================================================================================= - #convert temperature changes for each variable - dft = df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dft["observation_value"]=df["temperature"]+273.15 - dft["secondary_id"]=df["temperature_Source_Station_ID"].astype('str') - dft['secondary_id'] = dft['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dft["source_id"]=df["temperature_Source_Code"] - dft["Seconds"]="00" - dft["quality_flag"]=df["temperature_QC_flag"] - dft["qc_method"]=dft["quality_flag"] - dft["conversion_flag"]="0" - dft["conversion_method"]="1" - dft["numerical_precision"]="0.01" - dft["original_precision"]="0.1" - dft["original_units"]="60" - dft["original_value"]=df["temperature"] - dft["observation_height_above_station_surface"]="2" - dft["units"]="5" - dft["observed_variable"]="85" - - - # set quality flag from df master for variable and fill all nan with Null then change all nonnan to - dft.loc[dft['quality_flag'].notnull(), "quality_flag"] = 1 - dft = dft.fillna("Null") - dft.quality_flag[dft.quality_flag == "Null"] = 0 - # change for each variable if required - - # remove unwanted mising data rows - dft = dft.fillna("null") - dft = dft.replace({"null":"-99999"}) - dft = dft[dft.observation_value != -99999] - dft = dft.dropna(subset=['secondary_id']) - dft = dft.dropna(subset=['observation_value']) - dft["source_id"] = pd.to_numeric(dft["source_id"],errors='coerce') - - #concatenate columns for joining df for next step - dft['source_id'] = dft['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dft['primary_station_id_2']=dft['secondary_id'].astype(str)+'-'+dft['source_id'].astype(str) - dft["observation_value"] = pd.to_numeric(dft["observation_value"],errors='coerce') + # Change for each variable if required + dft = h_utils.overwrite_variable_info(dft, "temperature") + + # remove unwanted mising data rows + dft = h_utils.remove_missing_data_rows(dft, "temperature") + + # concatenate columns for joining df for next step + dft['source_id'] = dft['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) + dft['primary_station_id_2'] = dft['secondary_id'].astype(str) + '-' + dft['source_id'].astype(str) + dft["observation_value"] = pd.to_numeric(dft["observation_value"], errors='coerce') + + # add data policy and record number to df + dft = h_utils.add_data_policy(dft, data_policy_df) + + # Create observation_id field + dft = h_utils.construct_obs_id(dft) - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dft = dft.astype(str) - df2 = df2.astype(str) - dft= df2.merge(dft, on=['primary_station_id_2']) - dft['data_policy_licence'] = dft['data_policy_licence_x'] - dft['data_policy_licence'] = dft['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dft['observation_id']=dft['primary_station_id'].astype(str)+'-'+dft['record_number'].astype(str)+'-'+dft['date_time'].astype(str) - dft['observation_id'] = dft['observation_id'].str.replace(r' ', '-') - ##remove unwanted last two characters - dft['observation_id'] = dft['observation_id'].str[:-6] - dft["observation_id"]=dft["observation_id"]+'-'+dft['observed_variable'].astype(str)+'-'+dft['value_significance'].astype(str) - dft["report_id"]=dft["observation_id"].str[:-6] - - dft = dft[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] - df.dropna(subset = ["observation_value"], inplace=True) - dft['source_id'] = dft['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dft['data_policy_licence'] = dft['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dft["source_id"] = pd.to_numeric(dft["source_id"],errors='coerce') - dft["observation_value"] = pd.to_numeric(dft["observation_value"],errors='coerce') - dft["observation_value"]= dft["observation_value"].round(2) - - - + dft["report_id"] = dft["observation_id"].str[:-6] + + dft = dft[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + df.dropna(subset = ["observation_value"], inplace=True) + + # Ensure correct number of decimal places + dft = h_utils.fix_decimal_places(dft, do_obs_value=True) + + + # ================================================================================= + # convert dew point temperature changes for each variable + dfdpt= df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dfdpt["observation_value"]=df["dew_point_temperature"]+273.15 + dfdpt["original_value"]=df["dew_point_temperature"] + + dfdpt = h_utils.construct_extra_ids(dfdpt, df, "dew_point_temperature") + + dfdpt = overwrite_conversion_precision_info(dfdpt, "dew_point_temperature") + + # Extract QC information + dfdpt = h_utils.extract_qc_info(dfdpt, df, "dew_point_temperature") + + # Change for each variable if required + dfdpt = h_utils.overwrite_variable_info(dfdpt, "dew_point_temperature") + + # remove unwanted mising data rows + dfdpt = h_utils.remove_missing_data_rows(dfdpt, "dew_point_temperature") + + # concatenate columns for joining df for next step + dfdpt['source_id'] = dfdpt['source_id'].astype(str).apply(lambda x: x.replace('.0','')) + dfdpt['primary_station_id_2']=dfdpt['secondary_id'].astype(str)+'-'+dfdpt['source_id'].astype(str) + dfdpt["observation_value"] = pd.to_numeric(dfdpt["observation_value"],errors='coerce') + + # add data policy and record number to df + dfdpt = h_utils.add_data_policy(dfdpt, data_policy_df) + + # Create observation_id field + dfdpt = h_utils.construct_obs_id(dfdpt) + + dfdpt["report_id"]=dfdpt["observation_id"].str[:-6] + + dfdpt= dfdpt[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + dfdpt.dropna(subset = ["observation_value"], inplace=True) + + # Ensure correct number of decimal places + dfdpt = h_utils.fix_decimal_places(dfdpt, do_obs_value=True) + + + #==================================================================================== + # convert station level pressure + + dfslp = df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dfslp["observation_value"] = df["station_level_pressure"].map(float) + dfslp["original_value"] = df["station_level_pressure"] + + dfslp = h_utils.construct_extra_ids(dfslp, df, "station_level_pressure") + + dfslp = overwrite_conversion_precision_info(dfslp, "station_level_pressure") + + # Extract QC information + dfslp = h_utils.extract_qc_info(dfslp, df, "station_level_pressure") - #================================================================================= - # convert dew point temperature changes for each variable - dfdpt= df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dfdpt["observation_value"]=df["dew_point_temperature"]+273.15 - dfdpt["secondary_id"]=df["dew_point_temperature_Source_Station_ID"].astype(str) - dfdpt['secondary_id'] = dfdpt['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfdpt["source_id"]=df["dew_point_temperature_Source_Code"] - dfdpt["Seconds"]="00" - dfdpt["quality_flag"]=df["dew_point_temperature_QC_flag"] - dfdpt["conversion_flag"]="0" - dfdpt["conversion_method"]="1" - dfdpt["numerical_precision"]="0.01" - dfdpt["original_precision"]="0.1" - dfdpt["original_units"]="60" - dfdpt["original_value"]=df["dew_point_temperature"] - dfdpt["observation_height_above_station_surface"]="2" - dfdpt["units"]="5" - dfdpt["observed_variable"]="36" - - - # set quality flag from df master for variable and fill all nan with Null then change all nonnan to - dfdpt.loc[dfdpt['quality_flag'].notnull(), "quality_flag"] = 1 - dfdpt= dfdpt.fillna("Null") - dfdpt.quality_flag[dfdpt.quality_flag == "Null"] = 0 - - # remove unwanted mising data rows - dfdpt= dfdpt.fillna("null") - dfdpt= dfdpt.replace({"null":"-99999"}) - dfdpt= dfdpt[dfdpt.observation_value != -99999] - dfdpt = dfdpt.dropna(subset=['secondary_id']) - dfdpt = dfdpt.dropna(subset=['observation_value']) - dfdpt["source_id"] = pd.to_numeric(dfdpt["source_id"],errors='coerce') - - #concatenate columns for joining df for next step - dfdpt['source_id'] = dfdpt['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfdpt['primary_station_id_2']=dfdpt['secondary_id'].astype(str)+'-'+dfdpt['source_id'].astype(str) - dfdpt["observation_value"] = pd.to_numeric(dfdpt["observation_value"],errors='coerce') - - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dfdpt= dfdpt.astype(str) - df2 = df2.astype(str) - dfdpt= df2.merge(dfdpt, on=['primary_station_id_2']) - dfdpt['data_policy_licence'] = dfdpt['data_policy_licence_x'] - dfdpt['data_policy_licence'] = dfdpt['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfdpt['observation_id']=dfdpt['primary_station_id'].astype(str)+'-'+dfdpt['record_number'].astype(str)+'-'+dfdpt['date_time'].astype(str) - dfdpt['observation_id'] = dfdpt['observation_id'].str.replace(r' ', '-') - # remove unwanted last twpo characters - dfdpt['observation_id'] = dfdpt['observation_id'].str[:-6] - dfdpt["observation_id"]=dfdpt["observation_id"]+'-'+dfdpt['observed_variable'].astype(str)+'-'+dfdpt['value_significance'].astype(str) - dfdpt["report_id"]=dfdpt["observation_id"].str[:-6] - - - dfdpt= dfdpt[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] - dfdpt.dropna(subset = ["observation_value"], inplace=True) - dfdpt['source_id'] = dfdpt['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfdpt['data_policy_licence'] = dfdpt['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfdpt["source_id"] = pd.to_numeric(dfdpt["source_id"],errors='coerce') - dfdpt["observation_value"] = pd.to_numeric(dfdpt["observation_value"],errors='coerce') - dfdpt["observation_value"]= dfdpt["observation_value"].round(2) - - + # Change for each variable if required + dfslp = h_utils.overwrite_variable_info(dfslp, "station_level_pressure") - #==================================================================================== - # convert station level - - dfslp = df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dfslp["secondary_id"]=df["station_level_pressure_Source_Station_ID"].astype(str) - dfslp['secondary_id'] = dfslp['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfslp["observation_value"]=df["station_level_pressure"].map(float) - dfslp = dfslp.dropna(subset=['observation_value']) - dfslp["source_id"]=df["station_level_pressure_Source_Code"] - dfslp["Seconds"]="00" - dfslp["quality_flag"]=df["station_level_pressure_QC_flag"] - dfslp["conversion_flag"]="0" - dfslp["conversion_method"]="7" - dfslp["numerical_precision"]="10" - dfslp["original_precision"]="0.1" - dfslp["original_units"]="530" - dfslp["original_value"]=df["station_level_pressure"] - dfslp["observation_height_above_station_surface"]="2" - dfslp["units"]="32" - dfslp["observed_variable"]="57" - - - # set quality flag from df master for variable and fill all nan with Null then change all nonnan to - dfslp.loc[dfslp['quality_flag'].notnull(), "quality_flag"] = 1 - dfslp = dfslp.fillna("Null") - dfslp.quality_flag[dfslp.quality_flag == "Null"] = 0 - # change for each variable if required - - # remove unwanted mising data rows - dfslp = dfslp.fillna("null") - dfslp = dfslp.replace({"null":"-99999"}) - dfslp = dfslp[dfslp.observation_value != -99999] - dfslp = dfslp.dropna(subset=['secondary_id']) - dfslp = dfslp.dropna(subset=['observation_value']) - dfslp["source_id"] = pd.to_numeric(dfslp["source_id"],errors='coerce') - - # concatenate columns for joining df for next step - dfslp['source_id'] = dfslp['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfslp['primary_station_id_2']=dfslp['secondary_id'].astype(str)+'-'+dfslp['source_id'].astype(str) - - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dfslp = dfslp.astype(str) - df2 = df2.astype(str) - dfslp= df2.merge(dfslp, on=['primary_station_id_2']) - dfslp['data_policy_licence'] = dfslp['data_policy_licence_x'] - dfslp['data_policy_licence'] = dfslp['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfslp['observation_id']=dfslp['primary_station_id'].astype(str)+'-'+dfslp['record_number'].astype(str)+'-'+dfslp['date_time'].astype(str) - dfslp['observation_id'] = dfslp['observation_id'].str.replace(r' ', '-') - # remove unwanted last two characters - dfslp['observation_id'] = dfslp['observation_id'].str[:-6] - dfslp["observation_id"]=dfslp["observation_id"]+'-'+dfslp['observed_variable'].astype(str)+'-'+dfslp['value_significance'].astype(str) - dfslp["report_id"]=dfslp["observation_id"].str[:-6] - - dfslp = dfslp[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] - - - # make sure no decimal places an dround value to reuqred decimal places - dfslp['observation_value'] = dfslp['observation_value'].map(float) - dfslp['observation_value'] = (dfslp['observation_value']*100) - dfslp['observation_value'] = dfslp['observation_value'].map(int) - dfslp['source_id'] = dfslp['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfslp['data_policy_licence'] = dfslp['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfslp["source_id"] = pd.to_numeric(dfslp["source_id"],errors='coerce') - dfslp["observation_value"] = pd.to_numeric(dfslp["observation_value"],errors='coerce') - dfslp['observation_value'] = dfslp['observation_value'].astype(str).apply(lambda x: x.replace('.0','')) - - + # remove unwanted mising data rows + dfslp = h_utils.remove_missing_data_rows(dfslp, "station_level_pressure") + + # concatenate columns for joining df for next step + dfslp['source_id'] = dfslp['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) + dfslp['primary_station_id_2'] = dfslp['secondary_id'].astype(str) + '-' + dfslp['source_id'].astype(str) - #=========================================================================================== - # convert sea level presure - - dfmslp = df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dfmslp["secondary_id"]=df["sea_level_pressure_Source_Station_ID"].astype(str) - dfmslp['secondary_id'] = dfmslp['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfmslp["observation_value"]=df["sea_level_pressure"].map(float) - dfmslp = dfmslp.dropna(subset=['observation_value']) - dfmslp["source_id"]=df["sea_level_pressure_Source_Code"] - dfmslp["Seconds"]="00" - dfmslp["quality_flag"]=df["sea_level_pressure_QC_flag"] - dfmslp["conversion_flag"]="0" - dfmslp["conversion_method"]="7" - dfmslp["numerical_precision"]="10" - dfmslp["original_precision"]="0.1" - dfmslp["original_units"]="530" - dfmslp["original_value"]=df["sea_level_pressure"] - dfmslp["observation_height_above_station_surface"]="2" - dfmslp["units"]="32" - dfmslp["observed_variable"]="58" - - - # set quality flag from df master for variable and fill all nan with Null then change all nonnan to - dfmslp.loc[dfmslp['quality_flag'].notnull(), "quality_flag"] = 1 - dfmslp = dfmslp.fillna("Null") - dfmslp.quality_flag[dfmslp.quality_flag == "Null"] = 0 - # change for each variable if required - - # remove unwanted mising data rows - dfmslp = dfmslp.fillna("null") - dfmslp = dfmslp.replace({"null":"-99999"}) - dfmslp = dfmslp[dfmslp.observation_value != -99999] - dfmslp = dfmslp.dropna(subset=['secondary_id']) - dfmslp = dfmslp.dropna(subset=['observation_value']) - dfmslp["source_id"] = pd.to_numeric(dfmslp["source_id"],errors='coerce') - - # concatenate columns for joining df for next step - dfmslp['source_id'] = dfmslp['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfmslp['primary_station_id_2']=dfmslp['secondary_id'].astype(str)+'-'+dfmslp['source_id'].astype(str) + # add data policy and record number to df + dfslp = h_utils.add_data_policy(dfslp, data_policy_df) + + # Create observation_id field + dfslp = h_utils.construct_obs_id(dfslp) - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dfmslp = dfmslp.astype(str) - df2 = df2.astype(str) - dfmslp= df2.merge(dfmslp, on=['primary_station_id_2']) - dfmslp['data_policy_licence'] = dfmslp['data_policy_licence_x'] - dfmslp['data_policy_licence'] = dfmslp['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - - dfmslp['observation_id']=dfmslp['primary_station_id'].astype(str)+'-'+dfmslp['record_number'].astype(str)+'-'+dfmslp['date_time'].astype(str) - dfmslp['observation_id'] = dfmslp['observation_id'].str.replace(r' ', '-') - # remove unwanted last twpo characters - dfmslp['observation_id'] = dfmslp['observation_id'].str[:-6] - dfmslp["observation_id"]=dfmslp["observation_id"]+'-'+dfmslp['observed_variable'].astype(str)+'-'+dfmslp['value_significance'].astype(str) - dfmslp["report_id"]=dfmslp["observation_id"].str[:-6] - - - dfmslp = dfmslp[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] - - # make sure no decimal places and round value to required decimal places - - dfmslp['observation_value'] = dfmslp['observation_value'].map(float) - dfmslp['observation_value'] = (dfmslp['observation_value']*100) - dfmslp['observation_value'] = dfmslp['observation_value'].map(int) - dfmslp['source_id'] = dfmslp['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfmslp["observation_value"] = pd.to_numeric(dfmslp["observation_value"],errors='coerce') - dfmslp['data_policy_licence'] = dfmslp['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfmslp["source_id"] = pd.to_numeric(dfmslp["source_id"],errors='coerce') - dfmslp['observation_value'] = dfmslp['observation_value'].astype(str).apply(lambda x: x.replace('.0','')) - - + dfslp["report_id"]=dfslp["observation_id"].str[:-6] + + dfslp = dfslp[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + + dfslp['observation_value'] = dfslp['observation_value'].map(float) + dfslp['observation_value'] = (dfslp['observation_value']*100) + dfslp['observation_value'] = dfslp['observation_value'].map(int) + + # Ensure correct number of decimal places + dft = h_utils.fix_decimal_places(dft, do_obs_value=True) + + + #=========================================================================================== + # convert sea level presure + + dfmslp = df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dfmslp["observation_value"]=df["sea_level_pressure"].map(float) + dfmslp["original_value"]=df["sea_level_pressure"] + + dfmslp = h_utils.construct_extra_ids(dfmslp, df, "sea_level_pressure") - #======================================================================================================== - # wind direction convert - - dfwd = df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dfwd["secondary_id"]=df["wind_direction_Source_Station_ID"].astype(str) - dfwd['secondary_id'] = dfwd['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfwd["observation_value"]=df["wind_direction"] - dfwd["source_id"]=df["wind_direction_Source_Code"] - dfwd["Seconds"]="00" - dfwd["quality_flag"]=df["wind_direction_QC_flag"] - dfwd["conversion_flag"]="1" - dfwd["conversion_method"]="" - dfwd["numerical_precision"]="1" - dfwd["original_precision"]="1" - dfwd["original_units"]="110" - dfwd["original_value"]=df["wind_direction"] - dfwd["observation_height_above_station_surface"]="10" - dfwd["units"]="110" - dfwd["observed_variable"]="106" - - - # set quality flag from df master for variable and fill all nan with Null then change all nonnan to - dfwd.loc[dfwd['quality_flag'].notnull(), "quality_flag"] = 1 - dfwd = dfwd.fillna("Null") - dfwd.quality_flag[dfwd.quality_flag == "Null"] = 0 - # change for each variable if required - - # remove unwanted mising data rows - dfwd = dfwd.fillna("null") - dfwd = dfwd.replace({"null":"-99999"}) - dfwd = dfwd[dfwd.observation_value != -99999] - dfwd = dfwd.dropna(subset=['secondary_id']) - dfwd = dfwd.dropna(subset=['observation_value']) - dfwd["source_id"] = pd.to_numeric(dfwd["source_id"],errors='coerce') - - # concatenate columns for joining df for next step - dfwd['source_id'] = dfwd['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfwd['primary_station_id_2']=dfwd['secondary_id'].astype(str)+'-'+dfwd['source_id'].astype(str) - dfwd["observation_value"] = pd.to_numeric(dfwd["observation_value"],errors='coerce') - - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dfwd = dfwd.astype(str) - df2 = df2.astype(str) - dfwd= df2.merge(dfwd, on=['primary_station_id_2']) - dfwd['data_policy_licence'] = dfwd['data_policy_licence_x'] - dfwd['data_policy_licence'] = dfwd['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - - dfwd['observation_id']=dfwd['primary_station_id'].astype(str)+'-'+dfwd['record_number'].astype(str)+'-'+dfwd['date_time'].astype(str) - dfwd['observation_id'] = dfwd['observation_id'].str.replace(r' ', '-') - - # remove unwanted last two characters - dfwd['observation_id'] = dfwd['observation_id'].str[:-6] - dfwd["observation_id"]=dfwd["observation_id"]+'-'+dfwd['observed_variable'].astype(str)+'-'+dfwd['value_significance'].astype(str) - dfwd["report_id"]=dfwd["observation_id"].str[:-7] - - dfwd = dfwd[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] + dfmslp = overwrite_conversion_precision_info(dfmslp, "sea_level_pressure") + + # Extract QC information + dfmslp = h_utils.extract_qc_info(dfmslp, df, "sea_level_pressure") - + # Change for each variable if required + dfmslp = h_utils.overwrite_variable_info(dfmslp, "sea_level_pressure") - # make sure no decimal places an dround value to reuqred decimal places - dfwd.dropna(subset = ["observation_value"], inplace=True) - dfwd['observation_value'] = dfwd['observation_value'].astype(str).apply(lambda x: x.replace('.0','')) - dfwd['source_id'] = dfwd['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfwd['data_policy_licence'] = dfwd['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfwd["source_id"] = pd.to_numeric(dfwd["source_id"],errors='coerce') - - - #=========================================================================== - # wind speed convert - - dfws = df[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] - - # change for each variable to convert to cdm compliant values - dfws["secondary_id"]=df["wind_speed_Source_Station_ID"].astype(str) - dfws['secondary_id'] = dfws['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfws["observation_value"]=df["wind_speed"] - dfws["source_id"]=df["wind_speed_Source_Code"] - dfws["Seconds"]="00" - dfws["quality_flag"]=df["wind_speed_QC_flag"] - dfws["conversion_flag"]="2" - dfws["conversion_method"]="" - dfws["numerical_precision"]="0.1" - dfws["original_precision"]="0.1" - dfws["original_units"]="731" - dfws["original_value"]=df["wind_speed"] - dfws["observation_height_above_station_surface"]="10" - dfws["units"]="731" - dfws["observed_variable"]="107" - - - # set quality flag from df master for variable and fill all nan with Null then change all non-nan to - dfws.loc[dfws['quality_flag'].notnull(), "quality_flag"] = 1 - dfws = dfws.fillna("Null") - dfws.quality_flag[dfws.quality_flag == "Null"] = 0 - # change for each variable if required - - # remove unwanted mising data rows - dfws = dfws.fillna("null") - dfws = dfws.replace({"null":"-99999"}) - dfws = dfws[dfws.observation_value != -99999] - dfws = dfws.dropna(subset=['secondary_id']) - dfws = dfws.dropna(subset=['observation_value']) - dfws["source_id"] = pd.to_numeric(dfws["source_id"],errors='coerce') - - # concatenate columns for joining df for next step - dfws['source_id'] = dfws['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfws['primary_station_id_2']=dfws['secondary_id'].astype(str)+'-'+dfws['source_id'].astype(str) - dfws["observation_value"] = pd.to_numeric(dfws["observation_value"],errors='coerce') - - # add data policy and record number to df - df2 = pd.read_csv(utils.SUBDAILY_STATION_RECORD_ENTRIES_OBS_LITE, encoding='latin-1') - dfws = dfws.astype(str) - df2 = df2.astype(str) - dfws= df2.merge(dfws, on=['primary_station_id_2']) - dfws['data_policy_licence'] = dfws['data_policy_licence_x'] - dfws['data_policy_licence'] = dfws['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfws['observation_id']=dfws['primary_station_id'].astype(str)+'-'+dfws['record_number'].astype(str)+'-'+dfws['date_time'].astype(str) - dfws['observation_id'] = dfws['observation_id'].str.replace(r' ', '-') - - # remove unwanted last twpo characters - dfws['observation_id'] = dfws['observation_id'].str[:-6] - dfws["observation_id"]=dfws["observation_id"]+'-'+dfws['observed_variable'].astype(str)+'-'+dfws['value_significance'].astype(str) - dfws["report_id"]=dfws["observation_id"].str[:-7] - - - dfws = dfws[["observation_id","report_id","data_policy_licence","date_time", - "date_time_meaning","observation_duration","longitude","latitude", - "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", - "observed_variable","secondary_variable","observation_value", - "value_significance","secondary_value","units","code_table", - "conversion_flag","location_method","location_precision", - "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", - "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", - "quality_flag","numerical_precision","sensor_id","sensor_automation_status", - "exposure_of_sensor","original_precision","original_units", - "original_code_table","original_value","conversion_method", - "processing_code","processing_level","adjustment_id","traceability", - "advanced_qc","advanced_uncertainty","advanced_homogenisation", - "advanced_assimilation_feedback","source_id"]] - - # make sure no decimal places an dround value to reuqred decimal places - dfws.dropna(subset = ["observation_value"], inplace=True) - dfws['source_id'] = dfws['source_id'].astype(str).apply(lambda x: x.replace('.0','')) - dfws['data_policy_licence'] = dfws['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) - dfws["source_id"] = pd.to_numeric(dfws["source_id"],errors='coerce') - dfws["observation_value"] = pd.to_numeric(dfws["observation_value"],errors='coerce') - dfws["observation_value"]= dfws["observation_value"].round(2) - # merge all df into one cdmlite file - merged_df=pd.concat([dfdpt,dft,dfslp,dfmslp,dfwd,dfws], axis=0) - del dfdpt - del dft - del dfslp - del dfmslp - del dfwd - del dfws - - merged_df.sort_values("date_time") - - merged_df["latitude"] = pd.to_numeric(merged_df["latitude"],errors='coerce') - merged_df["longitude"] = pd.to_numeric(merged_df["longitude"],errors='coerce') - merged_df["latitude"]= merged_df["latitude"].round(3) - merged_df["longitude"]= merged_df["longitude"].round(3) + # remove unwanted mising data rows + dfmslp = h_utils.remove_missing_data_rows(dfmslp, "sea_level_pressure") - # Save CDM obs table to directory - try: - unique_variables = merged_df['observed_variable'].unique() - print(unique_variables) - merged_df.to_csv(cdmobs_outfile, index=False, sep="|") - print(f" {cdmobs_outfile}") - except: - continue + # concatenate columns for joining df for next step + dfmslp['source_id'] = dfmslp['source_id'].astype(str).apply(lambda x: x.replace('.0','')) + dfmslp['primary_station_id_2']=dfmslp['secondary_id'].astype(str)+'-'+dfmslp['source_id'].astype(str) + # add data policy and record number to df + dfmslp = h_utils.add_data_policy(dfmslp, data_policy_df) - # to next file in the loop + # Create observation_id field + dfmslp = h_utils.construct_obs_id(dfmslp) + + dfmslp["report_id"]=dfmslp["observation_id"].str[:-6] + + dfmslp = dfmslp[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + + dfmslp['observation_value'] = dfmslp['observation_value'].map(float) + dfmslp['observation_value'] = (dfmslp['observation_value']*100) + dfmslp['observation_value'] = dfmslp['observation_value'].map(int) + + # Ensure correct number of decimal places + dfmslp = h_utils.fix_decimal_places(dfmslp, do_obs_value=True) + + #======================================================================================================== + # convert wind direction + + dfwd = df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dfwd["observation_value"]=df["wind_direction"] + dfwd["original_value"]=df["wind_direction"] + + + dfwd = h_utils.construct_extra_ids(dfwd, df, "wind_direction") + + dfwd = overwrite_conversion_precision_info(dfwd, "wind_direction") + + # Extract QC information + dfwd = h_utils.extract_qc_info(dfwd, df, "wind_direction") + + # Change for each variable if required + dfwd = h_utils.overwrite_variable_info(dfwd, "wind_direction") + + # remove unwanted mising data rows + dfwd = h_utils.remove_missing_data_rows(dfwd, "wind_direction") + + # concatenate columns for joining df for next step + dfwd['source_id'] = dfwd['source_id'].astype(str).apply(lambda x: x.replace('.0','')) + dfwd['primary_station_id_2']=dfwd['secondary_id'].astype(str)+'-'+dfwd['source_id'].astype(str) + dfwd["observation_value"] = pd.to_numeric(dfwd["observation_value"], errors='coerce') + + # add data policy and record number to df + dfwd = h_utils.add_data_policy(dfwd, data_policy_df) + + # Create observation_id field + dfwd = h_utils.construct_obs_id(dfwd) + + dfwd["report_id"]=dfwd["observation_id"].str[:-7] # WD is 3 digits + + dfwd = dfwd[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + + + + # make sure no decimal places an dround value to reuqred decimal places + dfwd.dropna(subset = ["observation_value"], inplace=True) + + # Ensure correct number of decimal places + dfwd = h_utils.fix_decimal_places(dfwd, do_obs_value=False) + + #dfwd['observation_value'] = dfwd['observation_value'].astype(str).apply(lambda x: x.replace('.0','')) + #dfwd['source_id'] = dfwd['source_id'].astype(str).apply(lambda x: x.replace('.0','')) + #dfwd['data_policy_licence'] = dfwd['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) + #dfwd["source_id"] = pd.to_numeric(dfwd["source_id"],errors='coerce') + # missing Obs_value + + #=========================================================================== + # convert wind speed + + dfws = df[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id","primary_station_id","secondary_id"]] + + # change for each variable to convert to cdm compliant values + dfws["secondary_id"]=df["wind_speed_Source_Station_ID"].astype(str) + dfws['secondary_id'] = dfws['secondary_id'].astype(str).apply(lambda x: x.replace('.0','')) + + dfws = h_utils.construct_extra_ids(dfws, df, "wind_speed") + + dfws = overwrite_conversion_precision_info(dfws, "wind_speed") + + # Extract QC information + dfws = h_utils.extract_qc_info(dfws, df, "wind_speed") + + # Change for each variable if required + dfws = h_utils.overwrite_variable_info(dfws, "wind_speed") + + # remove unwanted mising data rows + dfws = h_utils.remove_missing_data_rows(dfws, "wind_speed") + + # concatenate columns for joining df for next step + dfws['source_id'] = dfws['source_id'].astype(str).apply(lambda x: x.replace('.0','')) + dfws['primary_station_id_2']=dfws['secondary_id'].astype(str)+'-'+dfws['source_id'].astype(str) + dfws["observation_value"] = pd.to_numeric(dfws["observation_value"],errors='coerce') + + # add data policy and record number to df + dfws = h_utils.add_data_policy(dfws, data_policy_df) + + # Create observation_id field + dfws = h_utils.construct_obs_id(dfws) + + dfws["report_id"] = dfws["observation_id"].str[:-7] # WS is 3 digits + + + dfws = dfws[["observation_id","report_id","data_policy_licence","date_time", + "date_time_meaning","observation_duration","longitude","latitude", + "crs","z_coordinate","z_coordinate_type","observation_height_above_station_surface", + "observed_variable","secondary_variable","observation_value", + "value_significance","secondary_value","units","code_table", + "conversion_flag","location_method","location_precision", + "z_coordinate_method","bbox_min_longitude","bbox_max_longitude", + "bbox_min_latitude","bbox_max_latitude","spatial_representativeness", + "quality_flag","numerical_precision","sensor_id","sensor_automation_status", + "exposure_of_sensor","original_precision","original_units", + "original_code_table","original_value","conversion_method", + "processing_code","processing_level","adjustment_id","traceability", + "advanced_qc","advanced_uncertainty","advanced_homogenisation", + "advanced_assimilation_feedback","source_id"]] + + dfws.dropna(subset = ["observation_value"], inplace=True) + + # Ensure correct number of decimal places + dfws = h_utils.fix_decimal_places(dfws, do_obs_value=True) + + # ================================================================================= + # Merge all dataframes into one CDMlite frame + merged_df = pd.concat([dfdpt,dft,dfslp,dfmslp,dfwd,dfws], axis=0) + del dfdpt + del dft + del dfslp + del dfmslp + del dfwd + del dfws + + # Sort by date/times and fix metadata + merged_df.sort_values("date_time", inplace=True) + merged_df["latitude"] = pd.to_numeric(merged_df["latitude"],errors='coerce') + merged_df["longitude"] = pd.to_numeric(merged_df["longitude"],errors='coerce') + merged_df["latitude"] = merged_df["latitude"].round(3) + merged_df["longitude"] = merged_df["longitude"].round(3) + + # Save CDM obs table to directory + try: + unique_variables = merged_df['observed_variable'].unique() + print(unique_variables) + merged_df.to_csv(cdmobs_outfile, index=False, sep="|") + print(f" {cdmobs_outfile}") + + except IOError: + # something wrong with file paths, despite checking + print(f"Cannot save datafile: {cdmlite_outfile}") + except RuntimeError: + print("Runtime error") + # TODO add logging for these errors + # to next file in the loop -# return # main + return # main - #**************************************** +#**************************************** if __name__ == "__main__": import argparse diff --git a/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_utils.py b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_utils.py new file mode 100644 index 0000000..5fc7c03 --- /dev/null +++ b/PYTHON_CDM_Conversion_code/hourly_qff_to_cdm_utils.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Subroutines for sub-daily QFF to CDM conversion scripts + +@author: rjhd2 +""" + +import pandas as pd + +HEIGHTS = { + "temperature" : "2", + "dew_point_temperature" : "2", + "station_level_pressure" : "2", + "sea_level_pressure" : "2", + "wind_direction" : "10", + "wind_speed" : "10", +} +UNITS = { + "temperature" : "5", + "dew_point_temperature" : "5", + "station_level_pressure" : "32", + "sea_level_pressure" : "32", + "wind_direction" : "320", + "wind_speed" : "731", +} +VARIABLE_ID = { + "temperature" : "85", + "dew_point_temperature" : "36", + "station_level_pressure" : "57", + "sea_level_pressure" : "58", + "wind_direction" : "106", + "wind_speed" : "107", +} +MISSING_DATA = { + "temperature" : -99999, + "dew_point_temperature" : -99999, + "station_level_pressure" : -99999, + "sea_level_pressure" : -99999, + "wind_direction" : -999, + "wind_speed" : -999, +} + + + +def construct_extra_ids(var_frame, all_frame, var_name): + """ + Construct source_id and secondary_id fields + + var_frame : `dataframe` + Dataframe for variable + + all_frame : `dataframe` + Dataframe for station + + var_name : `str` + Name of variable to use to extract QC information + """ + + var_frame["source_id"] = all_frame[f"{var_name}_Source_Code"] + var_frame["secondary_id"] = all_frame[f"{var_name}_Source_Station_ID"].astype('str') + var_frame['secondary_id'] = var_frame['secondary_id'].astype(str).apply(lambda x: x.replace('.0', '')) + + return var_frame + +def extract_qc_info(var_frame, all_frame, var_name, do_report_id=False): + """ + Extract QC information for the QC tables + + var_frame : `dataframe` + Dataframe for variable + + all_frame : `dataframe` + Dataframe for station + + var_name : `str` + Name of variable to use to extract QC information + + do_report_id : `bool` + Process the report_id field too (CDM Lite only) + """ + + var_frame["quality_flag"] = all_frame[f"{var_name}_QC_flag"] + var_frame["qc_method"] = var_frame["quality_flag"] + if do_report_id: + # CDM Lite version has report_id entry here (not needed for OBS) + var_frame["report_id"] = var_frame["date_time"] + + # Set quality flag from master dataframe for variable + # and fill all nan with Null then change all nonnan to 1 + var_frame.loc[var_frame['quality_flag'].notnull(), "quality_flag"] = 1 + var_frame = var_frame.fillna("Null") + var_frame.quality_flag[var_frame.quality_flag == "Null"] = 0 + + return var_frame + +def overwrite_variable_info(var_frame, var_name): + """ + Replace information for variable with CDM codes + + var_frame : `dataframe` + Dataframe for variable + + var_name : `str` + Name of variable + """ + + var_frame["observation_height_above_station_surface"] = HEIGHTS[var_name] + var_frame["units"] = UNITS[var_name] + var_frame["observed_variable"] = VARIABLE_ID[var_name] + + return var_frame + +def remove_missing_data_rows(var_frame, var_name): + """ + Remove rows with no data + + var_frame : `dataframe` + Dataframe for variable + + var_name : `str` + Name of variable + """ + + var_frame = var_frame.fillna("null") + var_frame = var_frame.replace({"null" : f"{MISSING_DATA[var_name]}"}) + var_frame = var_frame[var_frame.observation_value != MISSING_DATA[var_name]] + var_frame = var_frame.dropna(subset=['secondary_id']) + var_frame = var_frame.dropna(subset=['observation_value']) + var_frame["source_id"] = pd.to_numeric(var_frame["source_id"], errors='coerce') + + return var_frame + + +def add_data_policy(var_frame, policy_frame): + """ + Merge in data policy information from another dataframe + + var_frame : `dataframe` + Dataframe for variable + + policy_frame : `dataframe` + Dataframe for the data policy + """ + + var_frame = var_frame.astype(str) + + # merge policy frame into var_frame + var_frame = policy_frame.merge(var_frame, on=['primary_station_id_2']) + + # rename column and remove ".0" + var_frame['data_policy_licence'] = var_frame['data_policy_licence_x'] + + var_frame['data_policy_licence'] = var_frame['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0','')) + + return var_frame + + +def construct_obs_id(var_frame): + """ + construct `observation_id` field + + var_frame : `dataframe` + Dataframe for variable + """ + + # concatenate columns + var_frame['observation_id'] = var_frame['primary_station_id'].astype(str) + "-" + \ + var_frame['record_number'].astype(str) + "-" + \ + var_frame['date_time'].astype(str) + + var_frame['observation_id'] = var_frame['observation_id'].str.replace(r' ', '-') + + # Remove unwanted last two characters + var_frame['observation_id'] = var_frame['observation_id'].str[:-6] + var_frame["observation_id"] = var_frame["observation_id"] + "-" + \ + var_frame['observed_variable'].astype(str) + "-" + \ + var_frame['value_significance'].astype(str) + + return var_frame + + +def fix_decimal_places(var_frame, do_obs_value=True): + """ + Make sure no decimal places remain + or round value to required number of decimal places + + var_frame : `dataframe` + Dataframe for variable + """ + + # remove the decimal places by editing string + var_frame['source_id'] = var_frame['source_id'].astype(str).apply(lambda x: x.replace('.0', '')) + var_frame["source_id"] = pd.to_numeric(var_frame["source_id"], errors='coerce') + + # remove decimal places by editing string + var_frame['data_policy_licence'] = var_frame['data_policy_licence'].astype(str).apply(lambda x: x.replace('.0', '')) + + if do_obs_value: + # Convert to float to allow rounding + var_frame["observation_value"] = pd.to_numeric(var_frame["observation_value"], errors='coerce') + var_frame["observation_value"] = var_frame["observation_value"].round(2) + + return var_frame + +