From 8df57ea4a3393cdc0c161d1567f5a7e8668e04ba Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Tue, 24 Sep 2024 09:59:23 +0100 Subject: [PATCH] clear up the quantiles code a bit --- txpipe/diagnostics.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 39f6577d..bd0719b8 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -68,27 +68,32 @@ def run(self): for band in self.config["bands"]: col_names[f"mag_{band}"] = f"{shear_prefix}mag_{band}" + # We ask for quantiles at these points quantiles = np.linspace(0, 1, nedge, endpoint=True) percentiles = quantiles * 100 with self.open_input("shear_catalog") as f, self.open_input("shear_tomography_catalog") as g: - - # Create dask arrays of the columns. This loads lazily - cols = { - new_name: - da.from_array(f[f"shear/{old_name}"], chunks=chunk_rows) - for (new_name, old_name) in col_names.items() - } - - # Cut down to just the source selection + # We will be checking if the source is in a tomographic bin + # and doing quantiles only of selected obejcts (in any bin) bins = da.from_array(g["tomography/bin"], chunks=chunk_rows) selected = bins >= 0 - # Use dask to get the quantiles - quantile_values, = da.compute({ - name: da.percentile(col[selected], percentiles) - for name, col in cols.items() - }) + # We now build up the quantile values + quantile_values = {} + for new_name, old_name in cols.items(): + # Create dask arrays of the columns. This loads them lazily, + # so no data is actually loaded here. Only when the "compute" + # method is called below does anything actually happen. + col = da.from_array(f[f"shear/{old_name}"], chunks=chunk_rows) + + # Ask dask to compute the percentiles of this column. + # Again, it will not actually do anything until the "compute" + # method is called below. When that happens, it will + # chunk up the data and calculate the percentiles in parallel. + quantile_values[new_name] = da.percentile(col[selected], percentiles) + + # Now ask dask to actually do the calculations + quantile_values, = da.compute(quantile_values) # Open the output file and save the results with self.open_output("shear_catalog_quantiles") as f: