Skip to content

Commit

Permalink
Merge pull request #287 from simpeg/widescale_test
Browse files Browse the repository at this point in the history
WideScaleTest class test now implemented in prototype for stages 0,1,2,3,4,5 of earthscope tests
  • Loading branch information
kkappler authored Sep 19, 2023
2 parents f919ef9 + c2ce0bc commit bffd838
Show file tree
Hide file tree
Showing 27 changed files with 9,315 additions and 1,269 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ ignore = E501, W605, W503, E203, F401, E722
#,E402, E203,E722
#F841, E402, E722
#ignore = E203, E266, E501, W503, F403, F401
max-line-length = 79
max-line-length = 120
max-complexity = 18
select = B,C,E,F,W,T4,B9
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
conda install -c conda-forge pytest pytest-cov certifi">=2017.4.17" pandoc
pip install -r requirements-dev.txt
pip install git+https://github.com/kujaku11/mt_metadata.git@fcs
pip install git+https://github.com/kujaku11/mth5.git@fix_issue_157
pip install git+https://github.com/kujaku11/mth5.git@fc
- name: Install Our Package
run: |
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ repos:
rev: 22.6.0
hooks:
- id: black
language_version: python3.6
- repo: https://gitlab.com/pycqa/flake8
language_version: python3.10
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
23 changes: 23 additions & 0 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,12 @@ def station_obj_from_row(row):
continue

run_xrds = row["run_dataarray"].to_dataset("channel")
print(f"DEBUG Issue 289: TS {row.station_id} {row.run_id} {run_xrds.time.shape} {row.start} {row.end}")
run_obj = row.mth5_obj.from_reference(row.run_reference)
stft_obj = make_stft_objects(
tfk.config, i_dec_level, run_obj, run_xrds, units, row.station_id
)
print(f"DEBUG Issue 289: FC {row.station_id} {row.run_id} {stft_obj.time.shape} {stft_obj.time.min()} {stft_obj.time.max()}")
# Pack FCs into h5
if dec_level_config.save_fcs:
if dec_level_config.save_fcs_type == "csv":
Expand Down Expand Up @@ -320,6 +322,26 @@ def station_obj_from_row(row):
remote_stfts.append(stft_obj)

# Merge STFTs

# Timing Error Workaround See Aurora Issue #289
if tfk.config.stations.remote:
n_chunks = len(local_stfts)
for i_chunk in range(n_chunks):
ok = local_stfts[i_chunk].time.shape == remote_stfts[i_chunk].time.shape
if not ok:
print(f"Mismatch in FC array lengths detected -- Issue #289")
glb = max(local_stfts[i_chunk].time.min(), remote_stfts[i_chunk].time.min())
lub = min(local_stfts[i_chunk].time.max(), remote_stfts[i_chunk].time.max())
cond1 = local_stfts[i_chunk].time >= glb
cond2 = local_stfts[i_chunk].time <= lub
local_stfts[i_chunk] = local_stfts[i_chunk].where(cond1 & cond2, drop=True)
cond1 = remote_stfts[i_chunk].time >= glb
cond2 = remote_stfts[i_chunk].time <= lub
remote_stfts[i_chunk] = remote_stfts[i_chunk].where(cond1 & cond2, drop=True)
assert (local_stfts[i_chunk].time.shape==remote_stfts[i_chunk].time.shape)



local_merged_stft_obj = xr.concat(local_stfts, "time")

if tfk.config.stations.remote:
Expand Down Expand Up @@ -373,5 +395,6 @@ def station_obj_from_row(row):
tfk.config.channel_nomenclature,
survey_metadata=tfk_dataset.survey_metadata[survey_id],
)
tf_cls.station_metadata.transfer_function.processing_type = tfk.processing_type()
tfk_dataset.close_mths_objs()
return tf_cls
22 changes: 22 additions & 0 deletions aurora/pipelines/run_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ def add_duration(self, df=None):
df["duration"] = durations
return

def check_runs_are_valid(self, drop=False, **kwargs):
""" kwargs can tell us what sorts of conditions to check, for example all_zero, there are nan, etc."""
check_for_all_zero_runs = True
for i_row, row in self.df.iterrows():
print(f"Checking row for zeros {row}")
m = mth5.mth5.MTH5()
m.open_mth5(row.mth5_path)
run_obj = m.get_run(row.station_id, row.run_id, row.survey)
runts = run_obj.to_runts()
if runts.dataset.to_array().data.__abs__().sum() == 0:
print("CRITICAL: Detected a run with all zero values")
self.df["valid"].at[i_row] = False
# load each run, and take the median of the sum of the absolute values
if drop:
self.drop_invalid_rows()
return

def drop_invalid_rows(self):
self.df = self.df[self.df.valid]
self.df.reset_index(drop=True, inplace=True)

# BELOW FUNCTION CAN BE COPIED FROM METHOD IN KernelDataset()
# def drop_runs_shorter_than(self, duration, units="s"):
# if units != "s":
Expand Down Expand Up @@ -227,6 +248,7 @@ def channel_summary_to_run_summary(
data_dict["input_channels"] = input_channels
data_dict["output_channels"] = output_channels
data_dict["channel_scale_factors"] = channel_scale_factors
data_dict["valid"] = True

run_summary_df = pd.DataFrame(data=data_dict)
if sortby:
Expand Down
49 changes: 40 additions & 9 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def check_if_fc_levels_already_exist(self):
(Could also iterate over kernel_dataset.dataframe, to get the groupby).
If all FC Levels for a given station-run are already built, mark the RunSummary with a True in
the "fc" column.
the "fc" column. Otherwise its False
Note 1: Because decimation is a cascading operation, we avoid the case where some (valid) decimation
levels exist in the mth5 FC archive and others do not. The maximum granularity tolerated will be at the
Expand All @@ -163,8 +163,18 @@ def check_if_fc_levels_already_exist(self):
of the dataset_df may refernece the same h5, and I don't know if updating one row will have unintended
consequences.
Note #4: associated_run_sub_df may have multiple rows, even though the run id is unique.
This could happen for example when you have a long run at the local station, but multiple (say two) shorter runs
at the reference station. In that case, the processing summary will have a separate row for the
intersection of the long run with each of the remote runs. We ignore this for now, selecting only the first
element of the associated_run_sub_df, under the assumption that FCs have been created for the entire run,
or not at all. This assumption can be relaxed in future by using the time_period attribute of the FC layer.
For now, we proceed with the all-or-none logic. That is, if a ['survey', 'station_id', 'run_id',] has FCs,
assume that the FCs are present for the entire run. We assign the "fc" column of dataset_df to have the same
boolean value for all rows of same ['survey', 'station_id', 'run_id',] .
Returns: None
Modifies self.dataset_df inplace, assigning bool to the "fc" column
Modifies self.dataset_df inplace, assigning bools to the "fc" column
"""
groupby = ['survey', 'station_id', 'run_id',]
Expand All @@ -175,31 +185,36 @@ def check_if_fc_levels_already_exist(self):
cond2 = self.dataset_df.station_id == station_id
cond3 = self.dataset_df.run_id == run_id
associated_run_sub_df = self.dataset_df[cond1 & cond2 & cond3]
assert len(associated_run_sub_df) == 1 # should be unique
dataset_df_index = associated_run_sub_df.index[0]

if len(associated_run_sub_df) > 1:
# See Note #4
print("Warning -- not all runs will processed as a continuous chunk -- in future may need to loop over runlets to check for FCs")

dataset_df_indices = np.r_[associated_run_sub_df.index]
#dataset_df_indices = associated_run_sub_df.index.to_numpy()
run_row = associated_run_sub_df.iloc[0]
row_ssr_str = f"survey: {run_row.survey}, station_id: {run_row.station_id}, run_id: {run_row.run_id}"

# See Note #3 above
# See Note #3 above relating to mixing multiple surveys in a processing scheme
mth5_obj = self.mth5_objs[station_id]
survey_obj = mth5_obj.get_survey(survey_id)
station_obj = survey_obj.stations_group.get_station(station_id)
if not station_obj.fourier_coefficients_group.groups_list:
msg = f"Prebuilt Fourier Coefficients not detected for {row_ssr_str} -- will need to build them "
print(msg)
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
else:
print("Prebuilt Fourier Coefficients detected -- checking if they satisfy processing requirements...")
# Assume FC Groups are keyed by run_id, check if there is a relevant group
try:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_id)
except MTH5Error:
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Run ID {run_id} not found in FC Groups, -- will need to build them ")
continue

if len(fc_group.groups_list) < self.processing_config.num_decimation_levels:
self.dataset_df["fc"].iat[dataset_df_index] = False
self.dataset_df.loc[dataset_df_indices, "fc"] = False
print(f"Not enough FC Groups available for {row_ssr_str} -- will need to build them ")
continue

Expand All @@ -212,7 +227,7 @@ def check_if_fc_levels_already_exist(self):
# See note #2
fcs_already_there = fc_group.supports_aurora_processing_config(self.processing_config,
run_row.remote)
self.dataset_df["fc"].iat[dataset_df_index] = fcs_already_there
self.dataset_df.loc[dataset_df_indices, "fc"] = fcs_already_there

return

Expand Down Expand Up @@ -393,6 +408,22 @@ def is_valid_dataset(self, row, i_dec):
is_valid = processing_row.valid.iloc[0]
return is_valid

def processing_type(self):
"""
A description of the processing, will get passed to TF object,
can be used for Z-file
Could add a version or a hashtag to this
Could also check dataset_df
If remote.all==False append "Single Station"
"""
processing_type = "Aurora"
if self.dataset_df.remote.any():
processing_type = f"{processing_type}: Robust Remote Reference"
else:
processing_type = f"{processing_type}: Robust Single Station"
return processing_type

def memory_warning(self):
"""
Checks if we should be anitcipating a RAM issue
Expand Down
Loading

0 comments on commit bffd838

Please sign in to comment.