From 5f3aaa71bd3d9d9460e21cabe7ac5f7f9b480f35 Mon Sep 17 00:00:00 2001 From: "Karl N. Kappler" Date: Fri, 8 Sep 2023 08:36:54 -0700 Subject: [PATCH] Cleanup 01 and test using new framework - fix endrow in argparse so it is int or None --- .../earthscope/01_test_load_spud_tfs.py | 53 +++++++++---------- .../test_utils/earthscope/widescale_test.py | 52 +++++++++++++----- 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/aurora/test_utils/earthscope/01_test_load_spud_tfs.py b/aurora/test_utils/earthscope/01_test_load_spud_tfs.py index 92e72ba3..1876128b 100644 --- a/aurora/test_utils/earthscope/01_test_load_spud_tfs.py +++ b/aurora/test_utils/earthscope/01_test_load_spud_tfs.py @@ -42,10 +42,9 @@ # Config Params -XML_SOURCES = ["emtf", "data"] -N_PARTITIONS = 1 +DEFAULT_XML_SOURCES = ["emtf", "data"] -def define_dataframe_schema(): +def define_dataframe_schema(xml_sources=DEFAULT_XML_SOURCES): """ builds the csv defining column names, dtypes, and default values, and saves in standards/ @@ -62,7 +61,7 @@ def define_dataframe_schema(): df = pd.read_csv(schema_csv) # augment with new columns - for xml_source in XML_SOURCES: + for xml_source in xml_sources: name = f"{xml_source}_error" dtype = "bool" default = 0 @@ -99,7 +98,7 @@ def __init__(self, **kwargs): """ #super(WidesScaleTest, self).__init__(**kwargs) super().__init__(**kwargs) - self._residual_variance = None + self.xml_sources = kwargs.get("xml_sources", DEFAULT_XML_SOURCES) def prepare_jobs_dataframe(self): @@ -131,7 +130,7 @@ def enrich_row(self, row): Returns: """ - for xml_source in XML_SOURCES: + for xml_source in self.xml_sources: xml_path = SPUD_XML_PATHS[xml_source].joinpath(row[f"{xml_source}_xml_filebase"]) try: tf = load_xml_tf(xml_path) @@ -151,37 +150,35 @@ def enrich_row(self, row): def summarize_errors(): xml_sources = ["data", "emtf"] - df = load_most_recent_summary(1) + df = load_most_recent_summary(STAGE_ID) for xml_source in xml_sources: print(f"{xml_source} error \n {df[f'{xml_source}_error'].value_counts()}\n\n") - print("OK") + n_xml = len(df) + is_not_mda = df.data_xml_filebase.str.contains("__") + n_non_mda = is_not_mda.sum() + n_mda = len(df) - n_non_mda + print(f"There are {n_mda} / {n_xml} files with mda string ") + print(f"There are {n_non_mda} / {n_xml} files without mda string ") + # non_mda_df = df[is_not_mda] + return + def main(): define_dataframe_schema() - tester = TestLoadSPUDTFs(stage_id=1) - tester.run_test(row_end=10) - # # normal - # # results_df = batch_process(row_end=1) - # results_df = batch_process() - # - # # run only data - # #results_df = review_spud_tfs(xml_sources = ["data_xml_path", ]) - summarize_errors() + # normal + tester = TestLoadSPUDTFs(stage_id=STAGE_ID) + # tester.endrow = 5 + tester.run_test() + summarize_errors() + # run only data + # tester = TestLoadSPUDTFs(stage_id=STAGE_ID, xml_sources=["data",]) + # tester.run_test() + # summarize_errors() - # # DEBUGGING - df = load_most_recent_summary(1) - # n_xml = len(df) - # is_not_mda = df.data_xml_path.str.contains("__") - # n_non_mda = is_not_mda.sum() - # n_mda = len(df) - n_non_mda - # print(f"There are {n_mda} / {n_xml} files with mda string ") - # print(f"There are {n_non_mda} / {n_xml} files without mda string ") - # non_mda_df = df[is_not_mda] - print("summarize") - + return if __name__ == "__main__": diff --git a/aurora/test_utils/earthscope/widescale_test.py b/aurora/test_utils/earthscope/widescale_test.py index 35a14e0f..f142b287 100644 --- a/aurora/test_utils/earthscope/widescale_test.py +++ b/aurora/test_utils/earthscope/widescale_test.py @@ -12,20 +12,27 @@ def none_or_str(value): return None return value +def none_or_int(value): + if value == 'None': + return None + return int(value) DEFAULT_N_PARTITIONS = 1 class WidesScaleTest(object): def __init__(self, **kwargs): + self.parse_args() self.stage_id = kwargs.get("stage_id", None) self.jobs_df = None - self.n_partitions = kwargs.get("n_partitions", DEFAULT_N_PARTITIONS) + def prepare_jobs_dataframe(self): """ Makes the dataframe that will be populated/iterated over """ - pass + print("prepare_jobs_dataframe is not defined for Abstract Base Class") + raise NotImplementedError def enrich_row(self, row): + """ Will eventually get used by dask, but as a step we need to make this a method that works with df.apply()""" print("Enrich Row is not defined for Abstract Base Class") raise NotImplementedError @@ -39,23 +46,16 @@ def summary_table_filename(self): out_csv = get_summary_table_filename(self.stage_id) return out_csv - def run_test(self, row_start=0, row_end=None): - - - results_csv = self.summary_table_filename - enriched_df.to_csv(results_csv, index=False) - print(f"Took {time.time() - t0}s to review spud tfs, running with {N_PARTITIONS} partitions") - return enriched_df - - def run_test(self, row_start=0, row_end=None, **kwargs): + def run_test(self):#, row_start=0, row_end=None, **kwargs): """ iterates over dataframe, enriching rows""" t0 = time.time() self.jobs_df = self.prepare_jobs_dataframe() df = copy.deepcopy(self.jobs_df) - if row_end is None: - row_end = len(df) - df = df[row_start:row_end] + + if self.endrow is None: + self.endrow = len(df) + df = df[self.startrow:self.endrow] n_rows = len(df) print(f"nrows ---> {n_rows}") @@ -73,6 +73,30 @@ def run_test(self, row_start=0, row_end=None, **kwargs): print(f"Took {time.time() - t0}s to run STAGE {self.stage_id} with {self.n_partitions} partitions") return enriched_df + def parse_args(self): + parser = argparse.ArgumentParser(description="Wide Scale Earthscpe Test") + parser.add_argument("--npart", help="how many partitions to use (triggers dask dataframe if > 0", type=int, + default=1) + parser.add_argument("--startrow", help="First row to process (zero-indexed)", type=int, default=0) + # parser.add_argument('category', type=none_or_str, nargs='?', default=None, + # help='the category of the stuff') + parser.add_argument("--endrow", help="Last row to process (zero-indexed)", type=none_or_int, default=None, + nargs='?', ) + + args, unknown = parser.parse_known_args() + + + print(f"npartitions = {args.npart}") + self.n_partitions = args.npart + print(f"startrow = {args.startrow} {type(args.startrow)}") + self.startrow = args.startrow + print(f"endrow = {args.endrow} {type(args.endrow)}") + self.endrow = args.endrow + if isinstance(args.endrow, str): + args.endrow = int(args.endrow) + + return args + def report(self): pass