Skip to content

Commit

Permalink
manually merged from fix_issue_203a, tests pass locally
Browse files Browse the repository at this point in the history
  • Loading branch information
kkappler committed Sep 2, 2023
1 parent 98f3862 commit 949e585
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 43 deletions.
45 changes: 14 additions & 31 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ def process_tf_decimation_level(
def export_tf(
tf_collection,
channel_nomenclature,
station_metadata_dict={},
survey_dict={},
survey_metadata
):
"""
This method may wind up being embedded in the TF class
Expand All @@ -154,7 +153,6 @@ def export_tf(
tf_cls: mt_metadata.transfer_functions.core.TF
Transfer function container
"""
from mt_metadata.utils.list_dict import ListDict

merged_tf_dict = tf_collection.get_merged_dict(channel_nomenclature)
channel_nomenclature_dict = channel_nomenclature.to_dict()["channel_nomenclature"]
Expand All @@ -173,9 +171,9 @@ def export_tf(
res_cov = res_cov.rename(renamer_dict)
tf_cls.residual_covariance = res_cov

tf_cls.station_metadata._runs = ListDict()
tf_cls.station_metadata.from_dict(station_metadata_dict)
tf_cls.survey_metadata.from_dict(survey_dict)
# Set key as first el't of dict, nor currently supporting mixed surveys in TF
tf_cls.survey_metadata = survey_metadata

return tf_cls

def enrich_row(row):
Expand Down Expand Up @@ -305,13 +303,11 @@ def station_obj_from_row(row):
tf_dict = {}

for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()):
# Apply STFT to all runs
tfk.update_dataset_df(i_dec_level)

local_stfts = []
remote_stfts = []

# NOT sure this is needed if FCs already exist ... although there is no harm in doing it
tfk.update_dataset_df(i_dec_level)

# TFK 1: get clock-zero from data if needed
if dec_level_config.window.clock_zero_type == "data start":
dec_level_config.window.clock_zero = str(tfk.dataset_df.start.min())
Expand Down Expand Up @@ -339,7 +335,7 @@ def station_obj_from_row(row):
stft_obj = make_stft_objects(
tfk.config, i_dec_level, run_obj, run_xrds, units, row.station_id
)
# FC packing goes here
# Pack FCs into h5
if dec_level_config.save_fcs:
if dec_level_config.save_fcs_type == "csv":
print("WARNING: Unless you are debugging or running the tests, saving FCs to csv is unexpected")
Expand All @@ -354,13 +350,7 @@ def station_obj_from_row(row):
raise NotImplementedError("See Note #1 at top this method")
fc_group = station_obj.fourier_coefficients_group.add_fc_group(run_obj.metadata.id)
# See Technical Note posted on Aurora Issue #278, Sept 1, 2023

# OLD
# fc_decimation_level = fc_group.add_decimation_level(f"{i_dec_level}")

# NEW
# Make a dummy FC decimation level for a skeleton
#from mt_metadata.transfer_functions.processing.fourier_coefficients import
#from mth5.groups.fourier_coefficients import FCDecimationGroup
dummy_fc_decimation_level = fc_group.add_decimation_level("-1")
fc_group.remove_decimation_level("-1")
Expand Down Expand Up @@ -420,25 +410,18 @@ def station_obj_from_row(row):
tfk_dataset.close_mths_objs()
return tf_collection
else:
local_station_id = tfk.config.stations.local.id
station_metadata = tfk_dataset.get_station_metadata(local_station_id)
local_mth5_obj = tfk.mth5_objs[local_station_id]

if local_mth5_obj.file_version == "0.1.0":
survey_dict = local_mth5_obj.survey_group.metadata.to_dict()
elif local_mth5_obj.file_version == "0.2.0":
# this could be a method of tf_kernel.get_survey_dict()
survey_id = tfk.dataset_df[
tfk.dataset_df["station_id"] == local_station_id

# get the unique survey id that is not a remote reference
survey_id = tfk_dataset.df.loc[
tfk_dataset.df.remote == False
].survey.unique()[0]
survey_obj = local_mth5_obj.get_survey(survey_id)
survey_dict = survey_obj.metadata.to_dict()
if survey_id in ["none"]:
survey_id = "0"

tf_cls = export_tf(
tf_collection,
tfk.config.channel_nomenclature,
station_metadata_dict=station_metadata.to_dict(),
survey_dict=survey_dict,
survey_metadata=tfk_dataset.survey_metadata[survey_id],
)
tfk_dataset.close_mths_objs()
return tf_cls
3 changes: 1 addition & 2 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def initialize_mth5s(self, mode="r"):
self._mth5_objs = mth5_objs
return

def update_dataset_df(self,i_dec_level, fc_existence_info=None):
def update_dataset_df(self,i_dec_level):
"""
This function has two different modes. The first mode, initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
Expand Down Expand Up @@ -302,7 +302,6 @@ def update_dataset_df(self,i_dec_level, fc_existence_info=None):
# APPLY TIMING CORRECTIONS HERE
else:
print(f"DECIMATION LEVEL {i_dec_level}")
print("UNDER CONSTRUCTION 20230902 -- Need to skip if FC TRUE")
# See Note 1 top of module
# See Note 2 top of module
for i, row in self.dataset_df.iterrows():
Expand Down
26 changes: 25 additions & 1 deletion aurora/transfer_function/kernel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(self, **kwargs):
"end",
"duration",
]
self.survey_metadata = {}

def clone(self):
return copy.deepcopy(self)
Expand Down Expand Up @@ -351,15 +352,38 @@ def initialize_dataframe_for_processing(self, mth5_objs):
row.station_id, row.run_id, survey=row.survey
)
self.df["run_reference"].at[i] = run_obj.hdf5_group.ref

# but what if RR is from another survey?
# NEED TO MAKE THIS WORK WITH RUN_OBJ, NOT RUN_TS, ONLY DIFF IS ID NOT ASSIGNED METHINKS
# THAT WAY WE WONT NEED TO
# if i == 0:
# if run_ts.survey_metadata.id in self.survey_metadata.keys():
# pass
# self.survey_metadata[run_ts.survey_metadata.id] = run_ts.survey_metadata
# elif i > 0:
# self.survey_metadata[run_ts.survey_metadata.id].stations[0].add_run(run_ts.run_metadata)
# if len(self.survey_metadata.keys()) > 1:
# raise NotImplementedError

if row.fc:
msg = f"row {row} already has fcs prescribed by processing confg "
msg += "-- skipping time series initialzation"
print(msg)
continue
# continue
# the line below is not lazy, See Note #2
run_ts = run_obj.to_runts(start=row.start, end=row.end)
xr_ds = run_ts.dataset
self.df["run_dataarray"].at[i] = xr_ds.to_array("channel")

if i == 0:
if run_ts.survey_metadata.id in self.survey_metadata.keys():
pass
self.survey_metadata[run_ts.survey_metadata.id] = run_ts.survey_metadata
elif i > 0:
self.survey_metadata[run_ts.survey_metadata.id].stations[0].add_run(run_ts.run_metadata)
if len(self.survey_metadata.keys()) > 1:
raise NotImplementedError

print("DATASET DF POPULATED")

def add_columns_for_processing(self, mth5_objs):
Expand Down
4 changes: 2 additions & 2 deletions tests/synthetic/test_fourier_coefficients.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def test_123(self):
read_back_fcs(mth5_path)
# Confirm the file still processes fine with the fcs inside
# This is currently failing due to tf_kernel checking if fcs already exist, NotImplementedError
# tfc = process_mth5(processing_config, tfk_dataset=tfk_dataset)
#return tfc
tfc = process_mth5(processing_config, tfk_dataset=tfk_dataset)
return tfc
print("OK")
print("NEXT STEP is add a Tap-Point into existing processing to create these levels")
print("NEXT STEP AFTER THAT is to try processing data from the FC LEVEL")
Expand Down
31 changes: 24 additions & 7 deletions tests/synthetic/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def test_no_crash_with_too_many_decimations(self):
config_keyword="test1_tfk", z_file_path=z_file_path
)
tf_cls.write(fn=xml_file_name, file_type="emtfxml")
tf_cls.write(
fn=z_file_path.parent.joinpath(f"{z_file_path.stem}_from_tf.zss"),
file_type="zss",
)

xml_file_base = "syn1r2_tfk.xml"
xml_file_name = AURORA_RESULTS_PATH.joinpath(xml_file_base)
Expand Down Expand Up @@ -72,6 +76,10 @@ def test_can_use_mth5_file_version_020(self):
xml_file_base = f"syn1_mth5v{file_version}.xml"
xml_file_name = AURORA_RESULTS_PATH.joinpath(xml_file_base)
tf_cls.write(fn=xml_file_name, file_type="emtfxml")
tf_cls.write(
fn=z_file_path.parent.joinpath(f"{z_file_path.stem}_from_tf.zss"),
file_type="zss",
)

def test_can_use_scale_factor_dictionary(self):
"""
Expand All @@ -91,7 +99,11 @@ def test_can_use_scale_factor_dictionary(self):
z_file_path=z_file_path,
test_scale_factor=True,
)
assert tf_cls.transfer_function.data.shape == (25, 3, 2)
tf_cls.write(
fn=z_file_path.parent.joinpath(f"{z_file_path.stem}_from_tf.zss"),
file_type="zss",
)


def test_simultaneous_regression(self):
z_file_path = AURORA_RESULTS_PATH.joinpath("syn1_simultaneous_estimate.zss")
Expand All @@ -101,6 +113,10 @@ def test_simultaneous_regression(self):
xml_file_base = "syn1_simultaneous_estimate.xml"
xml_file_name = AURORA_RESULTS_PATH.joinpath(xml_file_base)
tf_cls.write(fn=xml_file_name, file_type="emtfxml")
tf_cls.write(
fn=z_file_path.parent.joinpath(f"{z_file_path.stem}_from_tf.zss"),
file_type="zss",
)

def test_can_process_other_station(self, remake_if_exists=True):
tf_cls = process_synthetic_2(remake_if_exists=remake_if_exists)
Expand Down Expand Up @@ -225,6 +241,7 @@ def process_synthetic_1(


def process_synthetic_2(remake_if_exists=True):
""" Rename remake_if_exists to force_make_mth5"""
station_id = "test2"
mth5_path = create_test2_h5(remake_if_exists=remake_if_exists)
mth5_paths = [
Expand All @@ -237,9 +254,9 @@ def process_synthetic_2(remake_if_exists=True):
processing_config = create_test_run_config(station_id, tfk_dataset)
for decimation_level in processing_config.decimations:
decimation_level.save_fcs = True
# decimation_level.save_fcs_type = "h5"
decimation_level.save_fcs_type = "csv"
tfc = process_mth5(processing_config, tfk_dataset=tfk_dataset)
decimation_level.save_fcs_type = "h5"
# decimation_level.save_fcs_type = "csv"
tfc = process_mth5(processing_config, tfk_dataset=tfk_dataset, z_file_path=AURORA_RESULTS_PATH.joinpath("test2q.zss"))
return tfc


Expand Down Expand Up @@ -271,9 +288,9 @@ def main():
"""
Testing the processing of synthetic data
"""
tmp = TestSyntheticProcessing()
tmp.setUp()
# process_synthetic_2(remake_if_exists=True)
# tmp = TestSyntheticProcessing()
# tmp.setUp()
# #process_synthetic_2(remake_if_exists=True)
# process_synthetic_2(remake_if_exists=False)
# tmp.test_can_process_other_station(remake_) # makes FC csvs

Expand Down

0 comments on commit 949e585

Please sign in to comment.