Skip to content

Commit

Permalink
tdying
Browse files Browse the repository at this point in the history
  • Loading branch information
kkappler committed Sep 4, 2023
1 parent b723eb0 commit 5e8c637
Showing 1 changed file with 18 additions and 30 deletions.
48 changes: 18 additions & 30 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,18 @@
from mt_metadata.transfer_functions.processing.aurora import Processing


def check_if_fcgroup_supports_processing_config(fc_group, processing_config, remote):
def fcgroup_supports_processing_config(fc_group, processing_config, remote):
"""
This could be made a method of mth5.groups.fourier_coefficients.FCGroup
As per Note #1 in check_if_fc_levels_already_exist(), this is an all-or-nothing check:
This is an "all-or-nothing" check:
Either every (valid) decimation level in the processing config is available or we will build all FCs.
Currently using a nested for loop, but this can be made a bit quicker by checking if sample_rates
are in agreement (if they aren't we don't need to check any other parameters)
Also, Once an fc_decimation_id is found to match a dec_level, we don't need to keep checking that fc_decimation_id
Note #1: The idea is to
Parameters
----------
fc_group
processing_config
fc_group: mth5.groups.fourier_coefficients.FCGroup
processing_config:
Returns
-------
Expand All @@ -36,25 +32,20 @@ def check_if_fcgroup_supports_processing_config(fc_group, processing_config, rem
fc_decimation_ids_to_check = fc_group.groups_list
levels_present = np.full(processing_config.num_decimation_levels, False)
for i, dec_level in enumerate(processing_config.decimations):
# See Note #1
#print(f"{i}")
#print(f"{dec_level}")

# All or nothing condition
if (i > 0):
if not levels_present[i - 1]:
return False

# iterate over fc_group decimations
# This can be done smarter ... once an fc_decimation_id is found to
for fc_decimation_id in fc_decimation_ids_to_check:
fc_dec_group = fc_group.get_decimation_level(fc_decimation_id)
fc_decimation = fc_dec_group.metadata
levels_present[i] = fc_decimation.has_fcs_for_aurora_processing(dec_level, remote)

if levels_present[i]:
fc_decimation_ids_to_check.remove(fc_decimation_id) #no need to look at this one again
break #break inner loop
break #break inner for-loop over decimations


return levels_present.all()
Expand Down Expand Up @@ -137,17 +128,13 @@ def initialize_mth5s(self, mode="r"):

def update_dataset_df(self,i_dec_level):
"""
This function has two different modes. The first mode, initializes values in the
This function has two different modes. The first mode initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
The second mode, decimates. The function is kept in pipelines becasue it calls
time series operations.
Notes:
1. When iterating over dataframe, (i)ndex must run from 0 to len(df), otherwise
get indexing errors. Maybe reset_index() before main loop? or push reindexing
into TF Kernel, so that this method only gets a cleanly indexed df, restricted to
only the runs to be processed for this specific TF?
2. When assigning xarrays to dataframe cells, df dislikes xr.Dataset,
1. When assigning xarrays to dataframe cells, df dislikes xr.Dataset,
so we convert to DataArray before assignment
Expand All @@ -174,8 +161,7 @@ def update_dataset_df(self,i_dec_level):
# APPLY TIMING CORRECTIONS HERE
else:
print(f"DECIMATION LEVEL {i_dec_level}")
# See Note 1 top of module
# See Note 2 top of module

for i, row in self.dataset_df.iterrows():
if not self.is_valid_dataset(row, i_dec_level):
continue
Expand All @@ -187,7 +173,7 @@ def update_dataset_df(self,i_dec_level):
run_xrds = row["run_dataarray"].to_dataset("channel")
decimation = self.config.decimations[i_dec_level].decimation
decimated_xrds = prototype_decimate(decimation, run_xrds)
self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array("channel")
self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array("channel") # See Note 1 above

print("DATASET DF UPDATED")
return
Expand Down Expand Up @@ -224,7 +210,7 @@ def check_if_fc_levels_already_exist(self):
"""
groupby = ['survey', 'station_id', 'run_id',]
grouper = self.processing_summary.groupby(groupby)
# print(len(grouper))

for (survey_id, station_id, run_id), df in grouper:
cond1 = self.dataset_df.survey == survey_id
cond2 = self.dataset_df.station_id == station_id
Expand All @@ -233,13 +219,15 @@ def check_if_fc_levels_already_exist(self):
assert len(associated_run_sub_df) == 1 # should be unique
dataset_df_index = associated_run_sub_df.index[0]
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {row.survey}, station_id: {row.station_id}, run_id: {row.run_id}"

# See Note #3 above
mth5_obj = self.mth5_objs[station_id]
survey_obj = mth5_obj.get_survey(survey_id)
station_obj = survey_obj.stations_group.get_station(station_id)
if not station_obj.fourier_coefficients_group.groups_list:
print("Prebuilt Fourier Coefficients not detected -- will need to build them ")
msg = f"Prebuilt Fourier Coefficients not detected for {row_ssr_str} -- will need to build them "
print(msg)
self.dataset_df["fc"].iat[dataset_df_index] = False
else:
print("Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements...")
Expand All @@ -253,19 +241,19 @@ def check_if_fc_levels_already_exist(self):

if len(fc_group.groups_list) < self.processing_config.num_decimation_levels:
self.dataset_df["fc"].iat[dataset_df_index] = False
print(f"Not enough FC Groups available -- will need to build them ")
print(f"Not enough FC Groups available for {row_ssr_str} -- will need to build them ")
continue

# Can check time periods here if desired, but unique (survey, station, run) should make this unneeded
# processing_run = self.processing_config.stations.local.get_run("001")
# processing_run = self.processing_config.stations.local.get_run(run_id)
# for tp in processing_run.time_periods:
# assert tp in fc_group time periods


# See note #2
fcs_already_there = check_if_fcgroup_supports_processing_config(fc_group,
self.processing_config,
run_row.remote)
fcs_already_there = fcgroup_supports_processing_config(fc_group,
self.processing_config,
run_row.remote)
self.dataset_df["fc"].iat[dataset_df_index] = fcs_already_there

return
Expand Down

0 comments on commit 5e8c637

Please sign in to comment.