diff --git a/examples/2.2i/pipeline.yml b/examples/2.2i/pipeline.yml index 54092ccda..ac55d3ddc 100644 --- a/examples/2.2i/pipeline.yml +++ b/examples/2.2i/pipeline.yml @@ -100,6 +100,7 @@ stages: # nodes: 2 # threads_per_process: 32 - name: TXTwoPointPlots + - name: TXDiagnosticQuantiles - name: TXSourceDiagnosticPlots nprocess: 16 - name: TXLensDiagnosticPlots diff --git a/examples/cosmodc2/config-20deg2.yml b/examples/cosmodc2/config-20deg2.yml index 69da07d4d..4c3d6185e 100644 --- a/examples/cosmodc2/config-20deg2.yml +++ b/examples/cosmodc2/config-20deg2.yml @@ -1,5 +1,6 @@ global: nside: 512 + pixelization: healpix TXGCRTwoCatalogInput: metacal_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/metacal_table_summary @@ -9,65 +10,70 @@ TXMetacalGCRInput: cat_name: dc2_object_run2.1i_dr1b_with_metacal_griz TXExposureInfo: - dc2_name: '1.2p' + dc2_name: 1.2p TXCosmoDC2Mock: cat_name: cosmoDC2_v1.1.4_image visits_per_band: 16 - extra_cols: redshift_true size_true shear_1 shear_2 Mag_true_r_sdss_z0 - flip_g2: True # to match metacal + extra_cols: redshift_true size_true shear_1 shear_2 Mag_true_r_sdss_z0 + flip_g2: true # to match metacal snr_limit: 4.0 Mag_r_limit: -19 - unit_response: True - apply_mag_cut: False + unit_response: true + apply_mag_cut: false TXIngestRedmagic: lens_zbin_edges: [0.15, 0.3, 0.45, 0.6, 0.75, 0.9] TXSourceTrueNumberDensity: + name: TXSourceTrueNumberDensity nz: 601 zmax: 3.0 chunk_rows: 100000 + weight_col: shear/00/weight + redshift_group: shear/00 TXLensTrueNumberDensity: + name: TXLensTrueNumberDensity nz: 601 zmax: 3.0 chunk_rows: 100000 + redshift_group: photometry TXLensMaps: - chunk_rows: 100000 + chunk_rows: 100000 pixelization: healpix - sparse: True + sparse: true TXSourceMaps: - sparse: True - chunk_rows: 100000 + sparse: true + chunk_rows: 100000 pixelization: healpix - true_shear: False + true_shear: false TXExternalLensMaps: - sparse: True - chunk_rows: 100000 + sparse: true + chunk_rows: 100000 pixelization: healpix TXExternalLensNoiseMaps: - chunk_rows: 100000 + chunk_rows: 100000 pixelization: healpix TXAuxiliarySourceMaps: - chunk_rows: 100000 - sparse: True - psf_prefix: psf_ + chunk_rows: 100000 + sparse: true + psf_prefix: psf_ TXAuxiliaryLensMaps: - chunk_rows: 100000 - sparse: True - bright_obj_threshold: 22.0 + chunk_rows: 100000 + sparse: true + bright_obj_threshold: 22.0 TXSimpleMask: depth_cut: 23.0 - bright_object_max: 10.0 + bright_object_max: 10.0 PZPDFMLZ: @@ -85,31 +91,53 @@ TXTrueNumberDensity: chunk_rows: 100000 TXSourceSelectorMetacal: - input_pz: False + input_pz: false bands: riz #used for selection T_cut: 0.5 s2n_cut: 10.0 max_rows: 1000 delta_gamma: 0.02 - source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3. ] # 7 bins + source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins chunk_rows: 100000 - true_z: False + true_z: false shear_prefix: mcal_ TXSourceSelectorMetadetect: - input_pz: False + input_pz: false bands: riz #used for selection T_cut: 0.5 s2n_cut: 10.0 max_rows: 1000 delta_gamma: 0.02 - source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3. ] # 7 bins + source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins chunk_rows: 100000 - true_z: False + true_z: false shear_prefix: '' +TXDiagnosticQuantiles: + nbins: 20 + psf_prefix: 00/mcal_psf_ + shear_prefix: 00/ + + + + +TXPhotozPlotLens: + name: TXPhotozPlotLens + +TXPhotozPlotSource: + name: TXPhotozPlotSource + +TXRandomForestLensSelector: + verbose: false + bands: ugrizy + lens_zbin_edges: [0.2, 0.4, 0.6, 0.8, 1.0, 1.2] + random_seed: 79521323 + mag_i_limit: 24.1 + selection_type: maglim + TXRandomCat: chunk_rows: 100000 @@ -135,7 +163,7 @@ TXRealGaussianCovariance: min_sep: 2.5 max_sep: 250. nbins: 20 - use_true_shear: False + use_true_shear: false nprocess: 4 threads_per_process: 2 nodes: 4 @@ -143,29 +171,29 @@ TXRealGaussianCovariance: TXTwoPointFourier: chunk_rows: 100000 - flip_g1: True - flip_g2: True + flip_g1: true + flip_g2: true apodization_size: 0.0 cache_dir: ./cache_nmt/cosmodc2/nside512/ - true_shear: False + true_shear: false n_ell: 30 ell_max: 1536 # nside * 3 , since Namaster computes that anyway. - analytic_noise: True - + analytic_noise: true + TXTwoPoint: bin_slop: 0.01 delta_gamma: 0.02 - do_pos_pos: True - do_shear_shear: True - do_shear_pos: True - flip_g2: True # use true when using metacal shears + do_pos_pos: true + do_shear_shear: true + do_shear_pos: true + flip_g2: true # use true when using metacal shears min_sep: 2.5 max_sep: 250 nbins: 20 verbose: 0 var_method: jackknife - + TXClusteringNoiseMaps: n_realization: 30 @@ -175,7 +203,7 @@ TXLensingNoiseMaps: TXTruthLensSelector: # Mag cuts chunk_rows: 100000 - lens_zbin_edges: [0.0,0.2,0.4] + lens_zbin_edges: [0.0, 0.2, 0.4] cperp_cut: 0.2 r_cpar_cut: 13.5 r_lo_cut: 16.0 diff --git a/examples/cosmodc2/config.yml b/examples/cosmodc2/config.yml index 6b7faf309..2249d6399 100644 --- a/examples/cosmodc2/config.yml +++ b/examples/cosmodc2/config.yml @@ -1,3 +1,7 @@ +global: + pixelization: healpix + chunk_rows: 1000000 + TXGCRTwoCatalogInput: metacal_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/metacal_table_summary photo_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/object_table_summary @@ -8,6 +12,15 @@ TXMetacalGCRInput: TXExposureInfo: dc2_name: 1.2p +TXLSSWeightsUnit: + nside: 2048 + pixelization: healpix + +TXDiagnosticQuantiles: + psf_prefix: mcal_psf_ + shear_prefix: mcal_ + nbins: 20 + TXCosmoDC2Mock: cat_name: cosmoDC2_v1.1.4_image @@ -26,7 +39,6 @@ TXSourceTrueNumberDensity: name: TXSourceTrueNumberDensity nz: 601 zmax: 3.0 - chunk_rows: 100000 weight_col: metacal/weight redshift_group: metacal @@ -34,11 +46,9 @@ TXLensTrueNumberDensity: name: TXLensTrueNumberDensity nz: 601 zmax: 3.0 - chunk_rows: 100000 redshift_group: photometry TXLensMaps: - chunk_rows: 100000 pixelization: healpix nside: 2048 sparse: true @@ -46,29 +56,26 @@ TXLensMaps: TXSourceMaps: nside: 2048 sparse: true - chunk_rows: 100000 pixelization: healpix true_shear: false TXExternalLensMaps: nside: 2048 sparse: true - chunk_rows: 100000 pixelization: healpix TXExternalLensNoiseMaps: nside: 2048 - chunk_rows: 100000 pixelization: healpix TXAuxiliarySourceMaps: - chunk_rows: 100000 sparse: true psf_prefix: psf_ TXAuxiliaryLensMaps: - chunk_rows: 100000 sparse: true + nside: 2048 + pixelization: healpix bright_obj_threshold: 22.0 TXSimpleMask: @@ -79,16 +86,14 @@ TXSimpleMask: PZPDFMLZ: nz: 301 zmax: 3.0 - chunk_rows: 100000 -TXPhotozStack: - chunk_rows: 100000 + # Mock version of stacking: TXTrueNumberDensity: nz: 301 zmax: 3.0 - chunk_rows: 100000 + TXSourceSelectorMetacal: input_pz: false @@ -99,7 +104,6 @@ TXSourceSelectorMetacal: delta_gamma: 0.02 source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins - chunk_rows: 100000 true_z: false shear_prefix: mcal_ @@ -112,13 +116,11 @@ TXSourceSelectorMetadetect: delta_gamma: 0.02 source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins - chunk_rows: 100000 true_z: false shear_prefix: '' TXRandomCat: - chunk_rows: 100000 density: 10 # gals per sq arcmin TXJackknifeCenters: @@ -141,13 +143,9 @@ TXRealGaussianCovariance: max_sep: 250. nbins: 20 use_true_shear: false - nprocess: 4 - threads_per_process: 2 - nodes: 4 galaxy_bias: [1.404, 1.458, 1.693, 1.922, 2.133] # Tinker bias values TXTwoPointFourier: - chunk_rows: 100000 flip_g1: true flip_g2: true apodization_size: 0.0 @@ -159,6 +157,7 @@ TXTwoPointFourier: analytic_noise: true TXTwoPoint: + reduce_randoms_size: 0.5 bin_slop: 0.01 delta_gamma: 0.02 do_pos_pos: true @@ -180,7 +179,6 @@ TXLensingNoiseMaps: TXTruthLensSelector: # Mag cuts - chunk_rows: 100000 lens_zbin_edges: [0.0, 0.2, 0.4] cperp_cut: 0.2 r_cpar_cut: 13.5 @@ -190,6 +188,16 @@ TXTruthLensSelector: i_hi_cut: 21.9 r_i_cut: 2.0 + +TXRandomForestLensSelector: + verbose: false + bands: ugrizy + lens_zbin_edges: [0.2, 0.4, 0.6, 0.8, 1.0, 1.2] + random_seed: 79521323 + mag_i_limit: 24.1 + selection_type: maglim + + TXPhotozPlotLens: name: TXPhotozPlotLens TXPhotozPlotSource: diff --git a/examples/cosmodc2/pipeline-20deg2.yml b/examples/cosmodc2/pipeline-20deg2.yml index 2f6df8368..ffc6d2a26 100644 --- a/examples/cosmodc2/pipeline-20deg2.yml +++ b/examples/cosmodc2/pipeline-20deg2.yml @@ -4,75 +4,113 @@ launcher: site: name: cori-interactive - image: ghcr.io/lsstdesc/txpipe + modules: txpipe python_paths: - - submodules/WLMassMap/python/desc/ - - submodules/FlexZPipe + - submodules/WLMassMap/python/desc/ + - submodules/FlexZPipe stages: - - name: TXRandomCat - nprocess: 32 - - name: TXSourceSelectorMetadetect - nprocess: 32 - - name: TXTruthLensSelector - nprocess: 32 - - name: TXLensTrueNumberDensity - - name: TXSourceTrueNumberDensity - - name: TXPhotozPlot - - name: TXShearCalibration - nprocess: 32 - - name: TXTruthLensCatalogSplitter - - name: TXTwoPointFourier - threads_per_process: 64 - - name: TXJackknifeCenters - - name: TXSourceMaps - nprocess: 8 - - name: TXLensMaps - nprocess: 8 - - name: TXAuxiliarySourceMaps - nprocess: 8 - - name: TXAuxiliaryLensMaps - nprocess: 8 - - name: TXDensityMaps - - name: TXSourceNoiseMaps - nprocess: 4 - nodes: 1 - threads_per_process: 1 - - name: TXLensNoiseMaps - nprocess: 4 - nodes: 1 - threads_per_process: 1 - - name: TXSimpleMask - - name: TXMapPlots - - name: TXTracerMetadata - - name: TXNullBlinding - - name: TXTwoPoint - threads_per_process: 64 - - name: TXTwoPointPlots - - name: TXFourierGaussianCovariance - threads_per_process: 32 - - name: TXTwoPointTheoryReal - - name: TXRealGaussianCovariance - threads_per_process: 32 - - name: TXConvergenceMaps - threads_per_process: 32 - - name: TXConvergenceMapPlots + - name: TXRandomCat + nprocess: 32 + - name: TXSourceSelectorMetadetect + nprocess: 32 + - name: TXLSSWeightsUnit + - name: TXRandomForestLensSelector + nprocess: 32 + - name: TXLensTrueNumberDensity + classname: TXTruePhotozStack + aliases: + tomography_catalog: lens_tomography_catalog + catalog: photometry_catalog + weights_catalog: none + photoz_stack: lens_photoz_stack + - name: TXSourceTrueNumberDensity + classname: TXTruePhotozStack + aliases: + tomography_catalog: shear_tomography_catalog + catalog: shear_catalog + weights_catalog: shear_catalog + photoz_stack: shear_photoz_stack + - name: TXPhotozPlotSource + classname: TXPhotozPlot + aliases: + photoz_stack: shear_photoz_stack + nz_plot: source_nz + - name: TXPhotozPlotLens + classname: TXPhotozPlot + aliases: + photoz_stack: lens_photoz_stack + nz_plot: lens_nz + - name: TXShearCalibration + nprocess: 32 + - name: TXTruthLensCatalogSplitter + nprocess: 32 + - name: TXTwoPointFourier + nprocess: 2 + nodes: 1 + threads_per_process: 64 + - name: TXTwoPointTheoryFourier + - name: TXTwoPointPlotsFourier + - name: TXJackknifeCenters + - name: TXSourceMaps + nprocess: 8 + - name: TXLensMaps + nprocess: 8 + - name: TXAuxiliarySourceMaps + nprocess: 8 + - name: TXAuxiliaryLensMaps + nprocess: 8 + - name: TXDensityMaps + - name: TXSourceNoiseMaps + nprocess: 4 + nodes: 1 + threads_per_process: 1 + - name: TXLensNoiseMaps + nprocess: 4 + nodes: 1 + threads_per_process: 1 + - name: TXSimpleMask + - name: TXMapPlots + - name: TXTracerMetadata + - name: TXNullBlinding + - name: TXTwoPoint + threads_per_process: 64 + nprocess: 1 + nodes: 1 + - name: TXTwoPointPlotsTheory + - name: TXDiagnosticQuantiles + nodes: 1 + nprocess: 16 + - name: TXLensDiagnosticPlots + nprocess: 16 + nodes: 1 + - name: TXSourceDiagnosticPlots + nprocess: 16 + nodes: 1 + - name: TXFourierGaussianCovariance + threads_per_process: 32 + - name: TXTwoPointTheoryReal + - name: TXRealGaussianCovariance + threads_per_process: 32 + - name: TXConvergenceMaps + threads_per_process: 32 + - name: TXConvergenceMapPlots output_dir: data/cosmodc2/outputs-20deg2 config: examples/cosmodc2/config-20deg2.yml inputs: # See README for paths to download these files - shear_catalog: ./data/cosmodc2/20deg2/shear_catalog.hdf5 - photometry_catalog: ./data/cosmodc2/20deg2/photometry_catalog.hdf5 + shear_catalog: ./data/cosmodc2/20deg2/shear_catalog.hdf5 + photometry_catalog: ./data/cosmodc2/20deg2/photometry_catalog.hdf5 fiducial_cosmology: data/fiducial_cosmology.yml - calibration_table: ./data/cosmodc2/20deg2/sample_cosmodc2_w10year_errors.dat - + calibration_table: ./data/cosmodc2/20deg2/sample_cosmodc2_w10year_errors.dat + none: none -resume: True +resume: true log_dir: data/cosmodc2/logs pipeline_log: data/cosmodc2/log.txt diff --git a/examples/cosmodc2/pipeline.yml b/examples/cosmodc2/pipeline.yml index 62a61df20..eb7451255 100644 --- a/examples/cosmodc2/pipeline.yml +++ b/examples/cosmodc2/pipeline.yml @@ -3,8 +3,8 @@ launcher: interval: 3.0 site: - name: cori-interactive - image: ghcr.io/lsstdesc/txpipe + name: nersc-interactive + modules: txpipe @@ -14,9 +14,11 @@ python_paths: stages: - name: TXRandomCat + nprocess: 16 - name: TXSourceSelectorMetacal nprocess: 32 - - name: TXTruthLensSelector + - name: TXLSSWeightsUnit + - name: TXRandomForestLensSelector nprocess: 32 - name: TXLensTrueNumberDensity classname: TXTruePhotozStack @@ -43,11 +45,15 @@ stages: photoz_stack: lens_photoz_stack nz_plot: lens_nz - name: TXShearCalibration + nprocess: 32 - name: TXTruthLensCatalogSplitter + nprocess: 32 - name: TXTwoPointFourier - nprocess: 2 + nprocess: 8 nodes: 2 threads_per_process: 32 + - name: TXTwoPointTheoryFourier + - name: TXTwoPointPlotsFourier - name: TXJackknifeCenters - name: TXSourceMaps nprocess: 8 @@ -67,15 +73,17 @@ stages: nodes: 1 threads_per_process: 1 - name: TXSimpleMask - - name: TXLSSWeightsUnit - name: TXMapPlots - name: TXTracerMetadata - name: TXNullBlinding - name: TXTwoPoint threads_per_process: 32 - nprocess: 2 + nprocess: 8 nodes: 2 - - name: TXTwoPointPlots + - name: TXTwoPointPlotsTheory + - name: TXDiagnosticQuantiles + nodes: 1 + nprocess: 16 - name: TXLensDiagnosticPlots nprocess: 16 nodes: 1 @@ -99,11 +107,11 @@ config: examples/cosmodc2/config.yml inputs: # See README for paths to download these files - shear_catalog: /global/projecta/projectdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/shear_catalog.hdf5 - photometry_catalog: /global/projecta/projectdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/photometry_catalog.hdf5 - photoz_trained_model: /global/projecta/projectdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/cosmoDC2_trees_i25.3.npy + shear_catalog: data/cosmodc2/inputs/shear_catalog.hdf5 + photometry_catalog: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/photometry_catalog.hdf5 + photoz_trained_model: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/cosmoDC2_trees_i25.3.npy fiducial_cosmology: data/fiducial_cosmology.yml - calibration_table: /global/projecta/projectdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/sample_cosmodc2_w10year_errors.dat + calibration_table: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/sample_cosmodc2_w10year_errors.dat none: none resume: true diff --git a/examples/lensfit/config.yml b/examples/lensfit/config.yml index 892d4ed24..60a90165b 100644 --- a/examples/lensfit/config.yml +++ b/examples/lensfit/config.yml @@ -163,6 +163,13 @@ TXBlinding: b0: 0.95 ## we use bias of the form b0/g delete_unblinded: true +TXDiagnosticQuantiles: + psf_prefix: psf_ + shear_prefix: "" + nbins: 20 + bands: ri + + TXSourceDiagnosticPlots: psf_prefix: psf_ shear_prefix: '' diff --git a/examples/lensfit/pipeline.yml b/examples/lensfit/pipeline.yml index 98516f6a6..9e306a1f1 100644 --- a/examples/lensfit/pipeline.yml +++ b/examples/lensfit/pipeline.yml @@ -59,6 +59,7 @@ stages: threads_per_process: 2 - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later - name: TXTwoPointPlots + - name: TXDiagnosticQuantiles - name: TXSourceDiagnosticPlots - name: TXLensDiagnosticPlots - name: TXGammaTStars diff --git a/examples/metacal/pipeline.yml b/examples/metacal/pipeline.yml index 093097248..af5b73698 100644 --- a/examples/metacal/pipeline.yml +++ b/examples/metacal/pipeline.yml @@ -74,6 +74,7 @@ stages: threads_per_process: 2 - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later - name: TXTwoPointPlotsTheory # Make plots of 2pt correlations + - name: TXDiagnosticQuantiles - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots - name: TXLensDiagnosticPlots # Make a suite of diagnostic plots - name: TXGammaTFieldCenters # Compute and plot gamma_t around center points diff --git a/examples/metadetect/config.yml b/examples/metadetect/config.yml index a6e764881..98cfeff07 100755 --- a/examples/metadetect/config.yml +++ b/examples/metadetect/config.yml @@ -41,9 +41,6 @@ TXLensTrueNumberDensity: PZPrepareEstimatorLens: name: PZPrepareEstimatorLens classname: BPZliteInformer - aliases: - input: spectroscopic_catalog - model: lens_photoz_model zmin: 0.0 zmax: 3.0 nzbins: 301 @@ -67,9 +64,6 @@ PZPrepareEstimatorLens: PZPrepareEstimatorSource: name: PZPrepareEstimatorSource classname: BPZliteInformer - aliases: - input: spectroscopic_catalog - model: source_photoz_model zmin: 0.0 zmax: 3.0 nzbins: 301 @@ -94,10 +88,6 @@ PZPrepareEstimatorSource: PZEstimatorLens: name: PZEstimatorLens classname: BPZliteEstimator - aliases: - model: lens_photoz_model - input: photometry_catalog - output: lens_photoz_pdfs zmin: 0.0 zmax: 3.0 dz: 0.01 @@ -219,6 +209,11 @@ TXBlinding: b0: 0.95 ## we use bias of the form b0/g delete_unblinded: true +TXDiagnosticQuantiles: + psf_prefix: 00/mcal_psf_ + shear_prefix: 00/ + nbins: 20 + TXSourceDiagnosticPlots: psf_prefix: 00/mcal_psf_ shear_prefix: 00/ @@ -315,14 +310,8 @@ PZRailSummarizeLens: nzbins: 50 name: PZRailSummarizeLens catalog_group: lens - tomography_name: "lens" + tomography_name: lens bands: ugrizy - aliases: - tomography_catalog: lens_tomography_catalog - binned_catalog: binned_lens_catalog - model: lens_direct_calibration_model - photoz_stack: lens_photoz_stack - photoz_realizations: lens_photoz_realizations PZRailSummarizeSource: @@ -331,38 +320,17 @@ PZRailSummarizeSource: zmax: 3.0 nzbins: 50 nsamples: 100 - mag_prefix: "/shear/00/mag_" + mag_prefix: /shear/00/mag_ catalog_group: shear tomography_name: source bands: riz name: PZRailSummarizeSource - aliases: - tomography_catalog: shear_tomography_catalog - binned_catalog: binned_shear_catalog - model: source_direct_calibration_model - photoz_stack: shear_photoz_stack - photoz_realizations: source_photoz_realizations - - FlowCreator: n_samples: 1000000 seed: 5763248 - aliases: - # This input was generated using get_example_flow in pzflow, - # not something specific. - output: ideal_specz_catalog - model: flow - InvRedshiftIncompleteness: pivot_redshift: 0.8 - aliases: - input: ideal_specz_catalog - output: specz_catalog_pq - GridSelection: - aliases: - input: ideal_specz_catalog - output: specz_catalog_pq redshift_cut: 5.1 ratio_file: data/example/inputs/hsc_ratios_and_specz.hdf5 settings_file: data/example/inputs/HSC_grid_settings.pkl @@ -374,47 +342,19 @@ GridSelection: TXParqetToHDF: hdf_group: photometry - aliases: - input: specz_catalog_pq - output: spectroscopic_catalog - - NZDirInformerSource: name: NZDirInformerSource usecols: [r, i, z] hdf5_groupname: photometry - aliases: - input: spectroscopic_catalog - model: source_direct_calibration_model - NZDirInformerLens: name: NZDirInformerLens - usecols: [u, g, r, i, z, "y"] + usecols: [u, g, r, i, z, y] hdf5_groupname: photometry - aliases: - input: spectroscopic_catalog - model: lens_direct_calibration_model - PZRealizationsPlotSource: name: PZRealizationsPlotSource - aliases: - photoz_realizations: source_photoz_realizations - photoz_realizations_plot: source_photoz_realizations_plot - PZRealizationsPlotLens: name: PZRealizationsPlotLens - aliases: - photoz_realizations: lens_photoz_realizations - photoz_realizations_plot: lens_photoz_realizations_plot - TXPhotozPlotSource: name: TXPhotozPlotSource - aliases: - photoz_stack: shear_photoz_stack - nz_plot: nz_source - TXPhotozPlotLens: name: TXPhotozPlotLens - aliases: - photoz_stack: lens_photoz_stack - nz_plot: nz_lens diff --git a/examples/metadetect/pipeline.yml b/examples/metadetect/pipeline.yml index cec17e444..eae543b01 100755 --- a/examples/metadetect/pipeline.yml +++ b/examples/metadetect/pipeline.yml @@ -74,7 +74,9 @@ stages: threads_per_process: 2 - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later - name: TXTwoPointPlots # Make plots of 2pt correlations + - name: TXDiagnosticQuantiles - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots + nprocess: 2 - name: TXLensDiagnosticPlots # Make a suite of diagnostic plots - name: TXGammaTFieldCenters # Compute and plot gamma_t around center points threads_per_process: 2 diff --git a/examples/metadetect_source_only/config.yml b/examples/metadetect_source_only/config.yml index fec35aab4..5501835f6 100755 --- a/examples/metadetect_source_only/config.yml +++ b/examples/metadetect_source_only/config.yml @@ -111,6 +111,12 @@ TXBlinding: b0: 0.95 ## we use bias of the form b0/g delete_unblinded: true +TXDiagnosticQuantiles: + psf_prefix: 00/mcal_psf_ + shear_prefix: 00/ + nbins: 20 + + TXSourceDiagnosticPlots: psf_prefix: 00/mcal_psf_ shear_prefix: 00/ diff --git a/examples/metadetect_source_only/pipeline.yml b/examples/metadetect_source_only/pipeline.yml index 73aecf72d..49f7f2460 100755 --- a/examples/metadetect_source_only/pipeline.yml +++ b/examples/metadetect_source_only/pipeline.yml @@ -1,62 +1,62 @@ - # Stages to run stages: - - name: FlowCreator # Simulate a spectroscopic population - aliases: - output: ideal_specz_catalog - model: flow - - name: GridSelection # Simulate a spectroscopic sample - aliases: - input: ideal_specz_catalog - output: specz_catalog_pq - - name: TXParqetToHDF # Convert the spec sample format - aliases: - input: specz_catalog_pq - output: spectroscopic_catalog - - name: PZRailSummarizeSource # Run the DIR method on the lens sample to find n(z) - classname: PZRailSummarize - aliases: - tomography_catalog: shear_tomography_catalog - binned_catalog: binned_shear_catalog - model: source_direct_calibration_model - photoz_stack: shear_photoz_stack - photoz_realizations: source_photoz_realizations - - name: TXSourceSelectorMetadetect # select and split objects into source bins - - name: NZDirInformerSource # Prepare the DIR method inputs for the source sample - classname: NZDirInformer - aliases: - input: spectroscopic_catalog - model: source_direct_calibration_model - - name: TXShearCalibration # Calibrate and split the source sample tomographically - - name: TXSourceMaps # make source g1 and g2 maps - - name: TXAuxiliarySourceMaps # make PSF and flag maps - - name: TXSimpleMaskSource # combine maps to make a simple mask - - name: TXSourceNoiseMaps # Compute shear noise using rotations - - name: TXMapPlots # make pictures of all the maps - - name: TXTracerMetadata # collate metadata - - name: TXJackknifeCentersSource # Split the area into jackknife regions - - name: TXTwoPoint # Compute real-space 2-point correlations - threads_per_process: 2 - - name: TXBlinding # Blind the data following Muir et al - threads_per_process: 2 - - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later - - name: TXTwoPointPlots # Make plots of 2pt correlations - - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots - - name: TXRoweStatistics # Compute and plot Rowe statistics - threads_per_process: 2 - - name: TXGalaxyStarShear # Compute and plot the star-galaxy shear cross-correlation - - name: TXPhotozPlotSource # Plot the bin n(z) - classname: TXPhotozPlot - aliases: - photoz_stack: shear_photoz_stack - nz_plot: nz_source - - name: PZRealizationsPlotSource # Plot n(z) realizations - classname: PZRealizationsPlot - aliases: - photoz_realizations: source_photoz_realizations - photoz_realizations_plot: source_photoz_realizations_plot - - name: TXConvergenceMaps # Make convergence kappa maps from g1, g2 maps - - name: TXConvergenceMapPlots # Plot the convergence map + - name: FlowCreator # Simulate a spectroscopic population + aliases: + output: ideal_specz_catalog + model: flow + - name: GridSelection # Simulate a spectroscopic sample + aliases: + input: ideal_specz_catalog + output: specz_catalog_pq + - name: TXParqetToHDF # Convert the spec sample format + aliases: + input: specz_catalog_pq + output: spectroscopic_catalog + - name: PZRailSummarizeSource # Run the DIR method on the lens sample to find n(z) + classname: PZRailSummarize + aliases: + tomography_catalog: shear_tomography_catalog + binned_catalog: binned_shear_catalog + model: source_direct_calibration_model + photoz_stack: shear_photoz_stack + photoz_realizations: source_photoz_realizations + - name: TXSourceSelectorMetadetect # select and split objects into source bins + - name: NZDirInformerSource # Prepare the DIR method inputs for the source sample + classname: NZDirInformer + aliases: + input: spectroscopic_catalog + model: source_direct_calibration_model + - name: TXShearCalibration # Calibrate and split the source sample tomographically + - name: TXSourceMaps # make source g1 and g2 maps + - name: TXAuxiliarySourceMaps # make PSF and flag maps + - name: TXSimpleMaskSource # combine maps to make a simple mask + - name: TXSourceNoiseMaps # Compute shear noise using rotations + - name: TXMapPlots # make pictures of all the maps + - name: TXTracerMetadata # collate metadata + - name: TXJackknifeCentersSource # Split the area into jackknife regions + - name: TXTwoPoint # Compute real-space 2-point correlations + threads_per_process: 2 + - name: TXBlinding # Blind the data following Muir et al + threads_per_process: 2 + - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later + - name: TXTwoPointPlots # Make plots of 2pt correlations + - name: TXDiagnosticQuantiles + - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots + - name: TXRoweStatistics # Compute and plot Rowe statistics + threads_per_process: 2 + - name: TXGalaxyStarShear # Compute and plot the star-galaxy shear cross-correlation + - name: TXPhotozPlotSource # Plot the bin n(z) + classname: TXPhotozPlot + aliases: + photoz_stack: shear_photoz_stack + nz_plot: nz_source + - name: PZRealizationsPlotSource # Plot n(z) realizations + classname: PZRealizationsPlot + aliases: + photoz_realizations: source_photoz_realizations + photoz_realizations_plot: source_photoz_realizations_plot + - name: TXConvergenceMaps # Make convergence kappa maps from g1, g2 maps + - name: TXConvergenceMapPlots # Plot the convergence map # Disabled since not yet synchronised with new Treecorr MPI # - name: TXSelfCalibrationIA # Self-calibration intrinsic alignments of galaxies @@ -80,7 +80,7 @@ modules: > # where to find any modules that are not in this repo, # and any other code we need. python_paths: - - submodules/WLMassMap/python/desc/ + - submodules/WLMassMap/python/desc/ # Where to put outputs output_dir: data/example/metadetect_source_only @@ -124,7 +124,7 @@ inputs: # if supported by the launcher, restart the pipeline where it left off # if interrupted -resume: True +resume: true # where to put output logs for individual stages log_dir: data/example/metadetect_source_only # where to put an overall parsl pipeline log diff --git a/examples/redmagic/pipeline.yml b/examples/redmagic/pipeline.yml index 0d883fb7e..548ad0f66 100644 --- a/examples/redmagic/pipeline.yml +++ b/examples/redmagic/pipeline.yml @@ -40,6 +40,7 @@ stages: aliases: photoz_stack: lens_photoz_stack nz_plot: nz_lens + - name: TXDiagnosticQuantiles - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots - name: TXLensDiagnosticPlots # Make a suite of diagnostic plots - name: TXGammaTFieldCenters # Compute and plot gamma_t around center points diff --git a/txpipe/auxiliary_maps.py b/txpipe/auxiliary_maps.py index f8ddd5ee2..4c6afa8ff 100644 --- a/txpipe/auxiliary_maps.py +++ b/txpipe/auxiliary_maps.py @@ -1,28 +1,19 @@ from .maps import TXBaseMaps, map_config_options import numpy as np from .base_stage import PipelineStage -from .mapping import ShearMapper, LensMapper, FlagMapper, BrightObjectMapper, DepthMapperDR1 +from .mapping import make_dask_shear_maps, make_dask_flag_maps, make_dask_bright_object_map, make_dask_depth_map from .data_types import MapsFile, HDFFile, ShearCatalog -from .utils import choose_pixelization, rename_iterated, read_shear_catalog_type +from .utils import choose_pixelization, import_dask +from .maps import map_config_options -class TXAuxiliarySourceMaps(TXBaseMaps): - """ - Generate auxiliary maps from the source catalog - - This stage makes maps of: - - the count of different flag values - - the mean PSF - These are currently only used for making visualizations in the later TXMapPlots - stage, and are not otherwise used directly. - Like most of the mapping stages it inherits most behavior from the TXBaseMaps - parent class, which specifies the primary `run` method. This is because most - mapper classes have the same overall structure. See that class for more details. - """ +class TXAuxiliarySourceMaps(PipelineStage): name = "TXAuxiliarySourceMaps" + dask_parallel = True + inputs = [ ("shear_catalog", ShearCatalog), # for psfs ("shear_tomography_catalog", HDFFile), # for per-bin psf maps @@ -31,161 +22,109 @@ class TXAuxiliarySourceMaps(TXBaseMaps): outputs = [ ("aux_source_maps", MapsFile), ] - config_options = { - "chunk_rows": 100_000, - "sparse": True, + "block_size": 0, "flag_exponent_max": 8, # flag bits go up to 2**8 by default "psf_prefix": "psf_", # prefix name for columns + **map_config_options } + def choose_pixel_scheme(self): with self.open_input("source_maps", wrapper=True) as maps_file: pix_info = dict(maps_file.file["maps"].attrs) return choose_pixelization(**pix_info) - def prepare_mappers(self, pixel_scheme): - # We make a suite of mappers here. - # We read nbin_source because we want PSF maps per-bin - with self.open_input("shear_tomography_catalog") as f: - nbin_source = f["tomography"].attrs["nbin"] - self.config["nbin_source"] = nbin_source # so it gets saved later - source_bins = list(range(nbin_source)) - - # For making psf_g1, psf_g2 maps, per source-bin - psf_mapper = ShearMapper( - pixel_scheme, source_bins, sparse=self.config["sparse"] - ) - - # for mapping the density of flagged objects - flag_mapper = FlagMapper( - pixel_scheme, self.config["flag_exponent_max"], sparse=self.config["sparse"] - ) - - return psf_mapper, flag_mapper - - def data_iterator(self): - psf_prefix = self.config["psf_prefix"] - shear_catalog_type = read_shear_catalog_type(self) - - # Flag column name depends on catalog type - if shear_catalog_type == "metacal": - shear_cols = [ - f"{psf_prefix}g1", - f"{psf_prefix}g2", - "mcal_flags", - "weight", - "ra", - "dec", - ] - renames = { - f"{psf_prefix}g1": "psf_g1", - f"{psf_prefix}g2": "psf_g2", - "mcal_flags": "flags", - } - elif shear_catalog_type == "metadetect": - shear_cols = [ - f"00/{psf_prefix}g1", - f"00/{psf_prefix}g2", - "00/flags", - "00/weight", - "00/ra", - "00/dec", - ] - renames = { - f"00/{psf_prefix}g1": "psf_g1", - f"00/{psf_prefix}g2": "psf_g2", - "00/flags": "flags", - "00/weight": "weight", - "00/ra": "ra", - "00/dec": "dec", - } - else: - shear_cols = [ - f"{psf_prefix}g1", - f"{psf_prefix}g2", - "flags", - "weight", - "ra", - "dec", - ] - renames = { - f"{psf_prefix}g1": "psf_g1", - f"{psf_prefix}g2": "psf_g2", - } - - # See maps.py for an explanation of this - it = self.combined_iterators( - self.config["chunk_rows"], - # first file - "shear_catalog", - "shear", - shear_cols, - # next file - "shear_tomography_catalog", - "tomography", - ["bin"], - ) - - return rename_iterated(it, renames) - - def accumulate_maps(self, pixel_scheme, data, mappers): - psf_mapper, flag_mapper = mappers - - psf_prefix = self.config["psf_prefix"] - - # Our different mappers want different data columns. - # We pull out the bits they need and give them just those. - - psf_data = { - "g1": data["psf_g1"], - "g2": data["psf_g2"], - "ra": data["ra"], - "dec": data["dec"], - "bin": data["bin"], - "weight": data["weight"], - } - - flag_data = { - "ra": data["ra"], - "dec": data["dec"], - "flags": data["flags"], - } + def run(self): + dask, da = import_dask() + import healpy - psf_mapper.add_data(psf_data) - flag_mapper.add_data(flag_data) + pixel_scheme = self.choose_pixel_scheme() + block_size = self.config["block_size"] + if block_size == 0: + block_size = "auto" - def finalize_mappers(self, pixel_scheme, mappers): - psf_mapper, flag_mapper = mappers + flag_exponent_max = self.config["flag_exponent_max"] - # Four different mappers - pix, counts, g1, g2, var_g1, var_g2, weight, _ = psf_mapper.finalize(self.comm) - flag_pixs, flag_maps = flag_mapper.finalize(self.comm) + # We have to keep this open throughout the process, because + # dask will internally load chunks of the input hdf5 data. + shear_cat = self.open_input("shear_catalog", wrapper=True) + shear_tomo = self.open_input("shear_tomography_catalog", wrapper=True) + nbin = shear_tomo.file['tomography'].attrs['nbin'] - # Collect all the maps + # The "all" bin is the non-tomographic case. + bins = list(range(nbin)) + ["all"] maps = {} - - if self.rank != 0: - return maps - - # Save PSF maps - for b in psf_mapper.bins: - maps["aux_source_maps", f"psf/counts_{b}"] = (pix, counts[b]) - maps["aux_source_maps", f"psf/g1_{b}"] = (pix, g1[b]) - maps["aux_source_maps", f"psf/g2_{b}"] = (pix, g2[b]) - maps["aux_source_maps", f"psf/var_g2_{b}"] = (pix, var_g1[b]) - maps["aux_source_maps", f"psf/var_g2_{b}"] = (pix, var_g2[b]) - maps["aux_source_maps", f"psf/lensing_weight_{b}"] = (pix, weight[b]) - - # Save flag maps - for i, (p, m) in enumerate(zip(flag_pixs, flag_maps)): + group = shear_cat.get_primary_catalog_group() + + # These don't actually load all the data - everything is lazy + ra = da.from_array(shear_cat.file[f"{group}/ra"], block_size) + # force all columns to use the same block size + block_size = ra.chunksize + dec = da.from_array(shear_cat.file[f"{group}/dec"], block_size) + psf_g1 = da.from_array(shear_cat.file[f"{group}/psf_g1"], block_size) + psf_g2 = da.from_array(shear_cat.file[f"{group}/psf_g2"], block_size) + weight = da.from_array(shear_cat.file[f"{group}/weight"], block_size) + if shear_cat.catalog_type == "metacal": + flag_name = "mcal_flags" + else: + flag_name = "flags" + flag = da.from_array(shear_cat.file[f"{group}/{flag_name}"], block_size) + b = da.from_array(shear_tomo.file["tomography/bin"], block_size) + + # collate metadata + metadata = { + key: self.config[key] + for key in map_config_options + } + metadata["flag_exponent_max"] = flag_exponent_max + metadata['nbin'] = nbin + metadata['nbin_source'] = nbin + metadata.update(pixel_scheme.metadata) + + for i in bins: + if i == "all": + w = b >= 0 + else: + w = b == i + + count_map, g1_map, g2_map, weight_map, esq_map, var1_map, var2_map = make_dask_shear_maps( + ra[w], dec[w], psf_g1[w], psf_g2[w], weight[w], pixel_scheme) + + pix = da.where(weight_map > 0)[0] + + # Change output name + if i == "all": + i = "2D" + + maps[f"psf/counts_{i}"] = (pix, count_map[pix]) + maps[f"psf/g1_{i}"] = (pix, g1_map[pix]) + maps[f"psf/g2_{i}"] = (pix, g2_map[pix]) + maps[f"psf/var_g2_{i}"] = (pix, var1_map[pix]) + maps[f"psf/var_g2_{i}"] = (pix, var2_map[pix]) + maps[f"psf/var_{i}"] = (pix, esq_map[pix]) + maps[f"psf/lensing_weight_{i}"] = (pix, weight_map[pix]) + + # Now add the flag maps. These are not tomographic. + pix, flag_maps = make_dask_flag_maps(ra, dec, flag, flag_exponent_max, pixel_scheme) + for j in range(flag_exponent_max): + maps[f"flags/flag_{2**j}"] = (pix, flag_maps[j][pix]) + + + maps, = dask.compute(maps) + + # Print out some info about the flag maps + for i in range(flag_exponent_max): f = 2**i - maps["aux_source_maps", f"flags/flag_{f}"] = (p, m) - # also print out some stats - t = m.sum() - print(f"Map shows total {t} objects with flag {f}") + count = maps[f"flags/flag_{f}"][1].sum() + print(f"Map shows total {count} objects with flag {f}") + + # write the output maps + with self.open_output("aux_source_maps", wrapper=True) as out: + for map_name, (pix, m) in maps.items(): + out.write_map(map_name, pix, m, metadata) + out.file['maps'].attrs.update(metadata) - return maps class TXAuxiliaryLensMaps(TXBaseMaps): @@ -194,11 +133,10 @@ class TXAuxiliaryLensMaps(TXBaseMaps): This class generates maps of: - depth - - psf - bright object counts - - flags """ name = "TXAuxiliaryLensMaps" + dask_parallel = True inputs = [ ("photometry_catalog", HDFFile), # for mags etc ] @@ -207,92 +145,81 @@ class TXAuxiliaryLensMaps(TXBaseMaps): ] config_options = { - "chunk_rows": 100_000, - "sparse": True, + "block_size": 0, "bright_obj_threshold": 22.0, # The magnitude threshold for a object to be counted as bright "depth_band": "i", # Make depth maps for this band "snr_threshold": 10.0, # The S/N value to generate maps for (e.g. 5 for 5-sigma depth) "snr_delta": 1.0, # The range threshold +/- delta is used for finding objects at the boundary } - # we dont redefine choose_pixel_scheme for lens_maps to prevent circular modules - - def prepare_mappers(self, pixel_scheme): - # We make a suite of mappers here. - - # for estimating depth per lens-bin - depth_mapper = DepthMapperDR1( - pixel_scheme, - self.config["snr_threshold"], - self.config["snr_delta"], - sparse=self.config["sparse"], - comm=self.comm, - ) - - # for mapping bright star fractions, for masks - brobj_mapper = BrightObjectMapper( - pixel_scheme, - self.config["bright_obj_threshold"], - sparse=self.config["sparse"], - comm=self.comm, - ) - - return depth_mapper, brobj_mapper - - def data_iterator(self): - band = self.config["depth_band"] - cols = ["ra", "dec", "extendedness", f"snr_{band}", f"mag_{band}"] - return self.iterate_hdf( - "photometry_catalog", "photometry", cols, self.config["chunk_rows"] - ) - - def accumulate_maps(self, pixel_scheme, data, mappers): - depth_mapper, brobj_mapper = mappers + def run(self): + # Import dask and alias it as 'da' + _, da = import_dask() + + + # Retrieve configuration parameters + block_size = self.config["block_size"] + if block_size == 0: + block_size = "auto" band = self.config["depth_band"] - # Our different mappers want different data columns. - # We pull out the bits they need and give them just those. - brobj_data = { - "mag": data[f"mag_{band}"], - "extendedness": data["extendedness"], - "ra": data["ra"], - "dec": data["dec"], - } - - depth_data = { - "mag": data[f"mag_{band}"], - "snr": data[f"snr_{band}"], - "ra": data["ra"], - "dec": data["dec"], - } - - depth_mapper.add_data(depth_data) - brobj_mapper.add_data(brobj_data) - - def finalize_mappers(self, pixel_scheme, mappers): - depth_mapper, brobj_mapper = mappers + # Open the input photometry catalog file. + # We can't use a "with" statement because we need to keep the file open + # while we're using dask. + f = self.open_input("photometry_catalog", wrapper=True) + + # Load photometry data into dask arrays. + # This is lazy in dask, so we're not actually loading the data here. + ra = da.from_array(f.file["photometry/ra"], block_size) + block_size = ra.chunksize + dec = da.from_array(f.file["photometry/dec"], block_size) + extendedness = da.from_array(f.file["photometry/extendedness"], block_size) + snr = da.from_array(f.file[f"photometry/snr_{band}"], block_size) + mag = da.from_array(f.file[f"photometry/mag_{band}"], block_size) + + # Choose the pixelization scheme based on the configuration. + # Might need to review this to make sure we use the same scheme everywhere + pixel_scheme = choose_pixelization(**self.config) + + # Initialize a dictionary to store the maps. + # To start with this is all lazy too, until we call compute + maps = {} - # Four different mappers - depth_pix, depth_count, depth, depth_var = depth_mapper.finalize(self.comm) - brobj_pix, brobj_count, brobj_mag, brobj_mag_var = brobj_mapper.finalize( - self.comm - ) + # Create a bright object map using dask + pix1, bright_object_count_map = make_dask_bright_object_map( + ra, dec, mag, extendedness, self.config["bright_obj_threshold"], pixel_scheme) + maps["bright_objects/count"] = (pix1, bright_object_count_map[pix1]) - # Collect all the maps - maps = {} + # Create depth maps using dask + pix2, count_map, depth_map, depth_var = make_dask_depth_map( + ra, dec, mag, snr, self.config["snr_threshold"], self.config["snr_delta"], pixel_scheme) + maps["depth/depth"] = (pix2, depth_map[pix2]) + maps["depth/depth_count"] = (pix2, count_map[pix2]) + maps["depth/depth_var"] = (pix2, depth_var[pix2]) - if self.rank != 0: - return maps - # Save depth maps - maps["aux_lens_maps", "depth/depth"] = (depth_pix, depth) - maps["aux_lens_maps", "depth/depth_count"] = (depth_pix, depth_count) - maps["aux_lens_maps", "depth/depth_var"] = (depth_pix, depth_var) + maps, = da.compute(maps) - # Save bright object counts - maps["aux_lens_maps", "bright_objects/count"] = (brobj_pix, brobj_count) + # Prepare metadata for the maps. Copy the pixelization-related + # configuration options only here + metadata = { + key: self.config[key] + for key in map_config_options + if key in self.config + } + # Then add the other configuration options + metadata["depth_band"] = band + metadata["depth_snr_threshold"] = self.config["snr_threshold"] + metadata["depth_snr_delta"] = self.config["snr_delta"] + metadata["bright_obj_threshold"] = self.config["bright_obj_threshold"] + metadata.update(pixel_scheme.metadata) + + # Write the output maps to the output file + with self.open_output("aux_lens_maps", wrapper=True) as out: + for map_name, (pix, m) in maps.items(): + out.write_map(map_name, pix, m, metadata) + out.file['maps'].attrs.update(metadata) - return maps class TXUniformDepthMap(PipelineStage): diff --git a/txpipe/base_stage.py b/txpipe/base_stage.py index 9636c8317..39fcb1457 100755 --- a/txpipe/base_stage.py +++ b/txpipe/base_stage.py @@ -21,6 +21,19 @@ class PipelineStage(PipelineStageBase): def run(self): print("Please do not execute this stage again.") + def time_stamp(self, tag): + """ + Print a time stamp with an optional tag. + + Parameters + ---------- + tag: str + Additional info to print in the output line. Default is empty. + """ + t = datetime.datetime.now() + print(f"Process {self.rank}: {tag} {t}") + sys.stdout + def memory_report(self, tag=None): """ Print a report about memory currently available diff --git a/txpipe/covariance.py b/txpipe/covariance.py index 8023da152..43404066b 100755 --- a/txpipe/covariance.py +++ b/txpipe/covariance.py @@ -12,6 +12,7 @@ import warnings import os import pickle +import sys # require TJPCov to be in PYTHONPATH d2r = np.pi / 180 @@ -30,7 +31,7 @@ class TXFourierGaussianCovariance(PipelineStage): measured. """ name = "TXFourierGaussianCovariance" - parallel = False + parallel = True do_xi = False inputs = [ @@ -81,7 +82,8 @@ def run(self): # C_ell covariance cov = self.compute_covariance(cosmo, meta, two_point_data=two_point_data) - self.save_outputs(two_point_data, cov) + if self.rank == 0: + self.save_outputs(two_point_data, cov) def save_outputs(self, two_point_data, cov): filename = self.get_output("summary_statistics_fourier") @@ -259,7 +261,7 @@ def compute_covariance_block( WT=None, ): import pyccl as ccl - from tjpcov.wigner_transform import bin_cov + from .utils.wigner_transform import bin_cov cl = {} @@ -406,8 +408,7 @@ def get_angular_bins(self, cl_sacc): return edges def make_wigner_transform(self, meta): - import threadpoolctl - from tjpcov.wigner_transform import WignerTransform + from .utils.wigner_transform import WignerTransform path = self.config["pickled_wigner_transform"] if path: @@ -419,22 +420,14 @@ def make_wigner_transform(self, meta): print(f"Precomputed wigner transform {path} not found.") print("Will compute it and then save it.") - # We don't want to use n processes with n threads each by accident, - # where n is the number of CPUs we have - # so for this bit of the code, which uses python's multiprocessing, - # we limit the number of threads that numpy etc can use. - # After this is finished this will switch back to allowing all the CPUs - # to be used for threading instead. - num_processes = int(os.environ.get("OMP_NUM_THREADS", 1)) - print("Generating Wigner Transform.") - with threadpoolctl.threadpool_limits(1): - WT = WignerTransform( - ell=meta["ell"], - theta=meta["theta"] * d2r, - s1_s2=[(2, 2), (2, -2), (0, 2), (2, 0), (0, 0)], - ncpu=num_processes, - ) - print("Computed Wigner Transform.") + self.time_stamp("Generating Wigner Transform") + WT = WignerTransform( + ell=meta["ell"], + theta=meta["theta"] * d2r, + s1_s2=[(2, 2), (2, -2), (0, 2), (2, 0), (0, 0)], + comm=self.comm, + ) + self.time_stamp("Completed Wigner Transform") if path: try: @@ -445,7 +438,6 @@ def make_wigner_transform(self, meta): # compute all the covariances and then combine them into one single giant matrix def compute_covariance(self, cosmo, meta, two_point_data): - from tjpcov.wigner_transform import bin_cov ccl_tracers, tracer_Noise = self.get_tracer_info( cosmo, meta, two_point_data=two_point_data @@ -496,12 +488,16 @@ def compute_covariance(self, cosmo, meta, two_point_data): for i in range(N2pt): tracer_comb1 = tracer_combs[i] + if i % self.size != self.rank: + continue + + count_xi_pm1 = 1 if i in range(xim_start, xim_end) else 0 for j in range(i, N2pt): tracer_comb2 = tracer_combs[j] print( - f"Computing {tracer_comb1} x {tracer_comb2}: chunk ({i},{j}) of ({N2pt},{N2pt})" + f"Rank {self.rank} computing {tracer_comb1} x {tracer_comb2}: chunk ({i},{j}) of ({N2pt},{N2pt})" ) count_xi_pm2 = 1 if j in range(xim_start, xim_end) else 0 @@ -551,6 +547,12 @@ def compute_covariance(self, cosmo, meta, two_point_data): cov_full[start_i:end_i, start_j:end_j] = cov_ij cov_full[start_j:end_j, start_i:end_i] = cov_ij.T + if self.comm is not None: + cov_full = self.comm.reduce(cov_full) + + if self.rank != 0: + return + try: np.linalg.cholesky(cov_full) except: @@ -574,7 +576,7 @@ class TXRealGaussianCovariance(TXFourierGaussianCovariance): TJPCov and a fiducial cosmology. """ name = "TXRealGaussianCovariance" - parallel = False + parallel = True do_xi = True inputs = [ diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index f8de05ab2..21b0816f8 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -11,9 +11,97 @@ band_variants, ) from .utils.fitting import fit_straight_line +from .utils import import_dask from .plotting import manual_step_histogram import numpy as np +class TXDiagnosticQuantiles(PipelineStage): + """ + Measure quantiles of various values in the shear catalog. + + This uses a library called "Distogram" which builds a + histogram that it gradually updates, tweaking the edges + as it goes along. This means we don't have to load + the full data into memory ever. + + The algorithm used can be noisy if there are large outliers in + the data, but after selection cuts are made this seems to be okay here, + with all the quantiles for the base quantities (T, s2n, T_psf, g1_psf) within + 10% of the true quantile and the majority within 1%. For our purposes + (defining bins for diagonstics) this is fine. + """ + name = "TXDiagnosticQuantiles" + dask_parallel = True + inputs = [ + ("shear_catalog", ShearCatalog), + ("shear_tomography_catalog", TomographyCatalog), + ] + outputs = [ + ("shear_catalog_quantiles", HDFFile), + ] + config_options = { + "shear_prefix": "mcal_", + "psf_prefix": "mcal_psf_", + "nbins": 20, + "chunk_rows": 0, + "bands": "riz", + } + def run(self): + _, da = import_dask() + + # Configuration parameters + psf_prefix = self.config["psf_prefix"] + shear_prefix = self.config["shear_prefix"] + chunk_rows = self.config["chunk_rows"] + nedge = self.config["nbins"] + 1 + if chunk_rows == 0: + chunk_rows = "auto" + + # We canonicalise the names here + cols = [] + col_names = { + "psf_g1": f"{psf_prefix}g1", + "psf_T_mean": f"{psf_prefix}T_mean", + "s2n": f"{shear_prefix}s2n", + "T": f"{shear_prefix}T", + } + + for band in self.config["bands"]: + col_names[f"mag_{band}"] = f"{shear_prefix}mag_{band}" + + quantiles = np.linspace(0, 1, nedge, endpoint=True) + percentiles = quantiles * 100 + + with self.open_input("shear_catalog") as f, self.open_input("shear_tomography_catalog") as g: + cols = {} + for (new_name, old_name) in col_names.items(): + cols[new_name] = da.from_array(f[f"shear/{old_name}"], chunks=chunk_rows) + # force all chunks to be the same even if we set auto mode + if chunk_rows == "auto": + chunk_rows = cols[new_name].chunksize + + # Cut down to just the source selection + bins = da.from_array(g["tomography/bin"], chunks=chunk_rows) + selected = bins >= 0 + + # Use dask to get the quantiles + quantile_values, = da.compute({ + name: da.percentile(col[selected], percentiles) + for name, col in cols.items() + }) + + # Open the output file and save the results + with self.open_output("shear_catalog_quantiles") as f: + # put everything in a group called "quantiles" + g = f.create_group("quantiles") + # Save the quantile points + g.create_dataset("quantiles", data=quantiles) + # Save the quantile values themselves + for name, quantile_values in quantile_values.items(): + g.create_dataset(name, data=quantile_values) + + + class TXSourceDiagnosticPlots(PipelineStage): """ Make diagnostic plots of the shear catalog @@ -27,6 +115,7 @@ class TXSourceDiagnosticPlots(PipelineStage): inputs = [ ("shear_catalog", ShearCatalog), ("shear_tomography_catalog", TomographyCatalog), + ("shear_catalog_quantiles", HDFFile) ] outputs = [ @@ -187,15 +276,16 @@ def run(self): plotter.send(None) except StopIteration: pass - - def BinEdges(self, data,nbins): - import scipy.stats as sts - - # calculate bin edges for equal number of objects per bin - nbquant = np.linspace(0,1,nbins,endpoint=False) - edges = sts.mstats.mquantiles(data,prob=nbquant) - return edges + def get_bin_edges(self, col): + """ + Get the bin edges for a given column from the quantiles file + """ + col = col.removeprefix(self.config["shear_prefix"]) + with self.open_input("shear_catalog_quantiles") as f: + edges = f[f"quantiles/{col}"][:] + return edges + def plot_psf_shear(self): # mean shear in bins of PSF print("Making PSF shear plot") @@ -204,15 +294,8 @@ def plot_psf_shear(self): psf_prefix = self.config["psf_prefix"] delta_gamma = self.config["delta_gamma"] - nbins = self.config["nbins"] - g_min = self.config["g_min"] - g_max = self.config["g_max"] - - with self.open_input("shear_catalog") as c: - col = c[f"shear/{psf_prefix}g1"][:] - psfg = col[(col > g_min) & (col < g_max)] - psf_g_edges = self.BinEdges(psfg,nbins) - del psfg + + psf_g_edges = self.get_bin_edges("psf_g1") p1 = MeanShearInBins( f"{psf_prefix}g1", @@ -265,7 +348,7 @@ def plot_psf_shear(self): std_err21 = mc_cov[0, 0] ** 0.5 line21 = slope21 * (mu2) + intercept21 - slope22, intercept22, mc_cov = fit_straight_line(mu2, mean22, y_err=std22) + slope22, intercept22, mc_cov = fit_straight_line(mu2[idx], mean22[idx], y_err=std22) std_err22 = mc_cov[0, 0] ** 0.5 line22 = slope22 * (mu2) + intercept22 @@ -312,19 +395,9 @@ def plot_psf_size_shear(self): psf_prefix = self.config["psf_prefix"] delta_gamma = self.config["delta_gamma"] - nbins = self.config["nbins"] - psfT_min = self.config["psfT_min"] - psfT_max = self.config["psfT_max"] - - with self.open_input("shear_catalog") as c: - col = c[f"shear/{psf_prefix}T_mean"][:] - if ((self.config["shear_catalog_type"] == 'lensfit') & (self.config['psf_unit_conv']==True)): - pix2arcsec = 0.214 - col = col * pix2arcsec**2 - psfT = col[(col > psfT_min) & (col < psfT_max)] - psf_T_edges = self.BinEdges(psfT,nbins) - del psfT + psf_T_edges = self.get_bin_edges("psf_T_mean") + binnedShear = MeanShearInBins( f"{psf_prefix}T_mean", @@ -386,16 +459,9 @@ def plot_snr_shear(self): # Parameters of the binning in SNR shear_prefix = self.config["shear_prefix"] delta_gamma = self.config["delta_gamma"] - nbins = self.config["nbins"] - s2n_min = self.config["s2n_min"] - s2n_max = self.config["s2n_max"] - - with self.open_input("shear_catalog") as c: - col = c[f"shear/{shear_prefix}s2n"][:] - s2n = col[(col > s2n_min) & (col < s2n_max)] - snr_edges = self.BinEdges(s2n,nbins) - del s2n - + + snr_edges = self.get_bin_edges("s2n") + # This class includes all the cutting and calibration, both for # estimator and selection biases binnedShear = MeanShearInBins( @@ -458,20 +524,9 @@ def plot_size_shear(self): shear_prefix = self.config["shear_prefix"] delta_gamma = self.config["delta_gamma"] - nbins = self.config["nbins"] - - T_min = self.config["T_min"] - T_max = self.config["T_max"] - - with self.open_input("shear_catalog") as c: - col = c[f"shear/{shear_prefix}T"][:] - if ((self.config["shear_catalog_type"] == 'lensfit') & (self.config['psf_unit_conv']==True)): - pix2arcsec = 0.214 - col = col * pix2arcsec**2 - T = col[(col > T_min) & (col < T_max)] - T_edges = self.BinEdges(T,nbins) - del T - + + T_edges = self.get_bin_edges("T") + binnedShear = MeanShearInBins( f"{shear_prefix}T", T_edges, @@ -537,10 +592,10 @@ def plot_mag_shear(self): stat = {} binnedShear = {} for band in self.config["bands"]: - - with self.open_input("shear_catalog") as c: - col = c[f"shear/{shear_prefix}mag_{band}"][:] - m_edges = self.BinEdges(col,nbins) + m_edges = self.get_bin_edges(f"{shear_prefix}mag_{band}") + # with self.open_input("shear_catalog") as c: + # col = c[f"shear/{shear_prefix}mag_{band}"][:] + # m_edges = self.BinEdges(col,nbins) binnedShear[f"{band}"] = MeanShearInBins( f"{shear_prefix}mag_{band}", @@ -564,8 +619,11 @@ def plot_mag_shear(self): for band in self.config["bands"]: stat[f"mu_{band}"], stat[f"mean1_{band}"], stat[f"mean2_{band}"], stat[f"std1_{band}"], stat[f"std2_{band}"] = binnedShear[f"{band}"].collect(self.comm) - if self.rank != 0: - return + if self.rank != 0: + return + + for band in self.config["bands"]: + dx = 0.05 * (m_edges[1] - m_edges[0]) idx = np.where(np.isfinite(stat[f"mu_{band}"]))[0] @@ -986,7 +1044,7 @@ def run(self): f.close() def load_data(self): - import dask.array as da + _, da = import_dask() bands = self.config["bands"] # These need to stay open until dask has finished with them.: @@ -1005,6 +1063,11 @@ def load_data(self): data = {} for b in bands: data[f"mag_{b}"] = da.from_array(f[f"photometry/mag_{b}"], block) + # all the blocks must be the same size. If the columns are of different + # types then dask can sometimes select different block sizes for each column + # which can cause problems. So we force them to be the same size. + if block == "auto": + block = data[f"mag_{b}"].chunksize data[f"snr_{b}"] = da.from_array(f[f"photometry/snr_{b}"], block) data["bin"] = da.from_array(g["tomography/bin"], block) @@ -1036,8 +1099,7 @@ def plot_mag_histograms(self, data, nbin): self.plot_histograms(data, nbin, "mag", xlog, bins) def plot_histograms(self, data, nbin, name, xlog, bins): - import dask - import dask.array as da + dask, da = import_dask() import matplotlib.pyplot as plt bands = self.config["bands"] diff --git a/txpipe/lens_selector.py b/txpipe/lens_selector.py index 9af432c38..8d1ff8717 100755 --- a/txpipe/lens_selector.py +++ b/txpipe/lens_selector.py @@ -208,6 +208,10 @@ def write_global_values(self, outfile, number_density_stats): group = outfile["tomography"] group["counts"][:] = lens_counts group["counts_2d"][:] = lens_counts_2d + print("Final bin counts:") + for i, b in enumerate(lens_counts): + print(f"Bin {i}: {b:,}") + print("Total: ", lens_counts_2d) def select_lens(self, phot_data): t = self.config["selection_type"] @@ -395,7 +399,8 @@ class TXRandomForestLensSelector(TXBaseLensSelector): ("photometry_catalog", PhotometryCatalog), ("calibration_table", TextFile), ] - config_options = TXBaseLensSelector.config_options.copy().update({ + config_options = TXBaseLensSelector.config_options.copy() + config_options.update({ "verbose": False, "bands": "ugrizy", "chunk_rows": 10000, diff --git a/txpipe/mapping/__init__.py b/txpipe/mapping/__init__.py index 4a5e9fe84..5b0a59978 100755 --- a/txpipe/mapping/__init__.py +++ b/txpipe/mapping/__init__.py @@ -1,2 +1,2 @@ -from .dr1 import DepthMapperDR1, BrightObjectMapper -from .basic_maps import ShearMapper, LensMapper, FlagMapper +from .dr1 import make_dask_bright_object_map, make_dask_depth_map +from .basic_maps import make_dask_flag_maps, make_dask_shear_maps, make_dask_lens_maps diff --git a/txpipe/mapping/basic_maps.py b/txpipe/mapping/basic_maps.py index c94e727dd..f9a9cf283 100755 --- a/txpipe/mapping/basic_maps.py +++ b/txpipe/mapping/basic_maps.py @@ -1,297 +1,124 @@ -from ..utils import choose_pixelization, HealpixScheme, GnomonicPixelScheme -from parallel_statistics import ParallelMeanVariance, ParallelSum +from ..utils import import_dask +from parallel_statistics import ParallelSum import numpy as np -class ShearMapper: - def __init__( - self, - pixel_scheme, - bins, - sparse=False, - ): - self.pixel_scheme = pixel_scheme - self.bins = bins + ["2D"] - self.sparse = sparse - # TODO - replace this with arrays for faster lookup - # We index this with (bin_index, quantity) where - # quantity = 0 (lens weight), 1 (g1), 2 (g2), and - # 'weight' (source weight) - self.stats = {} - # We make maps of each bin independently, and also of the - # combined 2D case, for all selected objects. - for b in self.bins: - # For the lensing we are interested in both the mean and - # the variance of the g1 and g2 signals in each pixel, in - # each bin - for t in [1, 2]: - self.stats[(b, t)] = ParallelMeanVariance(self.pixel_scheme.npix) - self.stats[(b, "weight")] = ParallelSum(self.pixel_scheme.npix) - self.stats[(b, "esq")] = ParallelSum(self.pixel_scheme.npix) +def make_dask_flag_maps(ra, dec, flag, max_exponent, pixel_scheme): + """ + Flags are assumed to be bitmasks, and this makes maps showing + the number of objects with each flag set in each pixel. + + Parameters + ---------- + ra : dask array + Right ascension in degrees + + dec : dask array + Declination in degrees + + flag : dask array + Flag values for each object + + max_exponent : int + The maximum exponent for the flag. + + pixel_scheme : PixelScheme + The pixelization scheme to use, typically Healpix with a given nside + """ + _, da = import_dask() + npix = pixel_scheme.npix + pix = pixel_scheme.ang2pix(ra, dec) + + maps = [] + for i in range(max_exponent): + f = 2**i + flag_map = da.where(flag & f, 1, 0) + flag_map = da.bincount(pix, weights=flag_map, minlength=npix).astype(int) + maps.append(flag_map) + pix = da.unique(pix) + return pix, maps + +def make_dask_shear_maps(ra, dec, g1, g2, weight, pixel_scheme,): + _, da = import_dask() + import healpy + npix = pixel_scheme.npix + # This seems to work directly, but we should check performance + pix = pixel_scheme.ang2pix(ra, dec) + + # count map is just the number of galaxies per pixel + count_map = da.bincount(pix, minlength=npix) + + # For the other map we use bincount with weights - these are the + # various maps by pixel. bincount gives the number of objects in each + # vaue of the first argument, weighted by the weights keyword, so effectively + # it gives us + # p_i = sum_{j} x[j] * delta_{pix[j], i} + # which is out map + weight_map = da.bincount(pix, weights=weight, minlength=npix) + g1_map = da.bincount(pix, weights=weight * g1, minlength=npix) + g2_map = da.bincount(pix, weights=weight * g2, minlength=npix) + esq_map = da.bincount(pix, weights=weight**2 * 0.5 * (g1**2 + g2**2), minlength=npix) + + # normalize by weights to get the mean map value in each pixel + g1_map /= weight_map + g2_map /= weight_map + + # Generate a catalog-like vector of the means so we can + # subtract from the full catalog. Not sure if this ever actually gets + # created, or if dask just keeps a conceptual reference to it. + g1_mean = g1_map[pix] + g2_mean = g2_map[pix] + + # Also generate variance maps + var1_map = da.bincount(pix, weights=weight * (g1 - g1_mean)**2, minlength=npix) + var2_map = da.bincount(pix, weights=weight * (g2 - g2_mean)**2, minlength=npix) + + # we want the variance on the mean, so we divide by both the weight + # (to go from the sum to the variance) and then by the count (to get the + # variance on the mean). Have verified that this is the same as using + # var() on the original arrays. + var1_map /= (weight_map * count_map) + var2_map /= (weight_map * count_map) + + # replace nans with UNSEEN. The NaNs can occur if there are no objects + # in a pixel, so the value is undefined. + g1_map[da.isnan(g1_map)] = healpy.UNSEEN + g2_map[da.isnan(g2_map)] = healpy.UNSEEN + var1_map[da.isnan(var1_map)] = healpy.UNSEEN + var2_map[da.isnan(var2_map)] = healpy.UNSEEN + esq_map[da.isnan(esq_map)] = healpy.UNSEEN + + return count_map, g1_map, g2_map, weight_map, esq_map, var1_map, var2_map + + +def make_dask_lens_maps(ra, dec, weight, tomo_bin, target_bin, pixel_scheme): + # this will actually load numpy if a debug env var is set + _, da = import_dask() + + # pixel scheme + import healpy + npix = pixel_scheme.npix + pix = pixel_scheme.ang2pix(ra, dec) + + # one unweighted count and one weighted count + if target_bin == "2D": + hit = da.where(tomo_bin >= 0, 1, 0) + else: + hit = da.where(tomo_bin == target_bin, 1, 0) + weighted_hit = da.where(tomo_bin == target_bin, weight, 0) + + # convert to maps and hit pixels + count_map = da.bincount(pix, weights=hit, minlength=npix) + weight_map = da.bincount(pix, weights=weighted_hit, minlength=npix) + pix = da.unique(pix) + + # mask out nans + count_map[da.isnan(weight_map)] = healpy.UNSEEN + weight_map[da.isnan(weight_map)] = healpy.UNSEEN + + return pix, count_map, weight_map - def add_data(self, data): - npix = self.pixel_scheme.npix - n = len(data["ra"]) - # Get pixel indices - pix_nums = self.pixel_scheme.ang2pix(data["ra"], data["dec"]) - source_weights = data["weight"] - source_bins = data["bin"] - g1 = data["g1"] - g2 = data["g2"] - for i in range(n): - p = pix_nums[i] - - if p < 0 or p >= npix: - continue - - # accumulate weighted g1, g2 - # and overall weight - source_bin = source_bins[i] - if source_bin >= 0: - sw = source_weights[i] - self.stats[(source_bin, 1)].add_datum(p, g1[i], sw) - self.stats[(source_bin, 2)].add_datum(p, g2[i], sw) - self.stats[(source_bin, "weight")].add_datum(p, sw) - esq = 0.5 * (g1[i] ** 2 + g2[i] ** 2) - self.stats[(source_bin, "esq")].add_datum(p, esq * sw**2) - # Also save 2D case - self.stats[("2D", 1)].add_datum(p, g1[i], sw) - self.stats[("2D", 2)].add_datum(p, g2[i], sw) - self.stats[("2D", "weight")].add_datum(p, sw) - - - def finalize(self, comm=None): - from healpy import UNSEEN - - g1 = {} - g2 = {} - count = {} - var_g1 = {} - var_g2 = {} - source_weight = {} - var_e = {} - - rank = 0 if comm is None else comm.Get_rank() - pixel = np.arange(self.pixel_scheme.npix) - - # mask is one where *any* of the maps are valid. - # this lets us maintain a single pixelization for - # everything. - mask = np.zeros(self.pixel_scheme.npix, dtype=bool) - - is_master = (comm is None) or (comm.Get_rank() == 0) - - for b in self.bins: - if rank == 0: - print(f"Collating shear map for source bin {b}") - - # Now for sourc maps, we get the weighted mean and - # and variance for each bin, and the total weight - # (same for g1 and g2) separately. - stats_g1 = self.stats[(b, 1)] - stats_g2 = self.stats[(b, 2)] - stats_weight = self.stats[(b, "weight")] - stats_esq = self.stats[(b, "esq")] - - count_g1, mean_g1, v_g1 = stats_g1.collect(comm) - count_g2, mean_g2, v_g2 = stats_g2.collect(comm) - _, weight = stats_weight.collect(comm) - _, esq = stats_esq.collect(comm) - - if not is_master: - continue - - # The counts should be the same - if not, something has - # gone wrong. Check, then delete to save memory - assert np.all(count_g1 == count_g2) - del count_g2 - - # Convert variance-of-value to variance-of-mean, - # Since that is what we want for noise estimation - v_g1 /= count_g1 - v_g2 /= count_g1 - - # Update the mask - mask[weight > 0] = True - - # Repalce NaNs with the Healpix unseen sentinel value - # -1.6375e30 - mean_g1[np.isnan(mean_g1)] = UNSEEN - mean_g2[np.isnan(mean_g2)] = UNSEEN - v_g1[np.isnan(v_g1)] = UNSEEN - v_g2[np.isnan(v_g2)] = UNSEEN - weight[np.isnan(weight)] = UNSEEN - esq[np.isnan(esq)] = UNSEEN - - # Save the maps for this tomographic bin - g1[b] = mean_g1 - g2[b] = mean_g2 - source_weight[b] = weight - var_g1[b] = v_g1 - var_g2[b] = v_g2 - var_e[b] = esq - count[b] = count_g1 - - # Remove pixels not detected in anything - if self.sparse: - pixel = pixel[mask] - for d in [count, g1, g2, var_g1, var_g2, source_weight, var_e]: - for k, v in list(d.items()): - d[k] = v[mask] - - return pixel, count, g1, g2, var_g1, var_g2, source_weight, var_e - - -class LensMapper: - def __init__( - self, - pixel_scheme, - bins, - sparse=False, - ): - self.pixel_scheme = pixel_scheme - self.bins = bins + ["2D"] - self.sparse = sparse - # TODO - replace this with arrays for faster lookup - # We index this with (bin_index, quantity) where - # quantity = 0 (lens weight), 1 (g1), 2 (g2), and - # 'weight' (source weight) - self.stats = {} - for b in self.bins: - # We construct galaxy density maps by summing up weights - # among galaxies in bins per pixel - t = 0 - self.stats[(b, t)] = ParallelSum(self.pixel_scheme.npix) - - - def add_data(self, data): - npix = self.pixel_scheme.npix - n = len(data["ra"]) - - # Get pixel indices - pix_nums = self.pixel_scheme.ang2pix(data["ra"], data["dec"]) - lens_weights = data["lens_weight"] - lens_bins = data["bin"] - - for i in range(n): - p = pix_nums[i] - - if p < 0 or p >= npix: - continue - - lens_bin = lens_bins[i] - # accumulate lens weight - if lens_bin >= 0: - lw = lens_weights[i] - self.stats[(lens_bin, 0)].add_datum(p, lw) - self.stats[("2D", 0)].add_datum(p, lw) - - def finalize(self, comm=None): - from healpy import UNSEEN - - ngal = {} - lens_weight = {} - - rank = 0 if comm is None else comm.Get_rank() - pixel = np.arange(self.pixel_scheme.npix) - - # mask is one where *any* of the maps are valid. - # this lets us maintain a single pixelization for - # everything. - mask = np.zeros(self.pixel_scheme.npix, dtype=bool) - - is_master = (comm is None) or (comm.Get_rank() == 0) - - for b in self.bins: - if rank == 0: - print(f"Collating density map for lens bin {b}") - - # Collect together galaxy counts from each processor. - # This class lets us collect both the raw number and - # the total weight in each pixel. We save both maps - lens_stats = self.stats[(b, 0)] - count, weight = lens_stats.collect(comm) - - if not is_master: - continue - - # There's a bit of a difference between the number counts - # and the shear in terms of the value to use - # when no objects are seen. For the ngal we will use - # zero, because an observed but empty region should indeed - # have that. The number density for shear should be much - # higher, to the point where we don't have this issue. - # So we use UNSEEN for shear and 0 for counts. - count[np.isnan(count)] = 0 - weight[np.isnan(weight)] = 0 - - ngal[b] = count.flatten() - lens_weight[b] = weight.flatten() - mask[lens_weight[b] > 0] = True - - # Remove pixels not detected in anything - if self.sparse: - pixel = pixel[mask] - for d in [ngal, lens_weight]: - for k, v in list(d.items()): - d[k] = v[mask] - - return pixel, ngal, lens_weight - - -class FlagMapper: - def __init__(self, pixel_scheme, flag_exponent_max, sparse=False): - self.pixel_scheme = pixel_scheme - self.sparse = sparse - self.maps = [ - np.zeros(self.pixel_scheme.npix, dtype=np.int32) - for i in range(flag_exponent_max) - ] - self.flag_exponent_max = flag_exponent_max - - def add_data(self, data): - ra = data["ra"] - dec = data["dec"] - flags = data["flags"] - pix_nums = self.pixel_scheme.ang2pix(ra, dec) - for i, m in enumerate(self.maps): - f = 2**i - w = np.where(f & (flags > 0)) - for p in pix_nums[w]: - m[p] += 1 - - def _combine_root(self, comm): - from mpi4py.MPI import INT32_T, SUM - - rank = comm.Get_rank() - maps = [] - for i, m in enumerate(self.maps): - y = np.zeros_like(m) if rank == 0 else None - comm.Reduce([m, INT32_T], [y, INT32_T], op=SUM, root=0) - maps.append(y) - return maps - - def finalize(self, comm=None): - if comm is None: - maps = self.maps - else: - maps = self._combine_root(comm) - if comm.Get_rank() > 0: - return None, None - - pixel = np.arange(self.pixel_scheme.npix) - if self.sparse: - pixels = [] - maps_out = [] - for m in maps: - w = np.where(m > 0) - pixels.append(pixel[w]) - maps_out.append(m[w]) - else: - pixels = [pixel for m in maps] - maps_out = maps - return pixels, maps_out diff --git a/txpipe/mapping/dr1.py b/txpipe/mapping/dr1.py index 4e8ae6146..5c7671441 100755 --- a/txpipe/mapping/dr1.py +++ b/txpipe/mapping/dr1.py @@ -1,160 +1,89 @@ import numpy as np +from ..utils import import_dask from parallel_statistics import ParallelMeanVariance - -class DepthMapperDR1: - def __init__(self, pixel_scheme, snr_threshold, snr_delta, sparse=False, comm=None): - """Class to build up depth maps iteratively as we cycle through a data set. - - Two methods should be used: - - add_data which should be called each time a new data chunk is loaded - - finalize, at the end to collect the results - - Parameters - ---------- - pixel_scheme: PixelScheme object - Converter from angle to pixel - - snr_threshold: float - Value of SNR to use as the depth (e.g. 5.0 for 5 sigma depth) - - snr_delta: float, optional - Half-width of the SNR values to use for the depth estimation - - sparse: bool, optional - Whether to use sparse indexing for the calculation. Faster if only a small number of pixels - Are used. - - comm: MPI communicator, optional - An MPI comm for parallel processing. If None, calculation is serial. - - Returns - ------- - - pixel: array - Indices of all pixels with any objects - - count: array - Number of objects in each pixel - - depth: array - Estimated depth of each pixel - - depth_var: array - Estimated variance of depth of each pixel - - """ - self.pixel_scheme = pixel_scheme - self.snr_threshold = snr_threshold - self.snr_delta = snr_delta - self.comm = comm - self.sparse = sparse - self.stats = ParallelMeanVariance(pixel_scheme.npix, sparse=sparse) - - def add_data(self, data): - ra = data["ra"] - dec = data["dec"] - snr = data["snr"] - mags = data["mag"] - # Get healpix pixels - pix_nums = self.pixel_scheme.ang2pix(ra, dec) - - # For each found pixel find all values hitting that pixel - # and yield the index and their magnitudes - for p in np.unique(pix_nums): - mask = (pix_nums == p) & (abs(snr - self.snr_threshold) < self.snr_delta) - self.stats.add_data(p, mags[mask]) - - def finalize(self, comm=None): - - count, depth, depth_var = self.stats.collect(comm) - - # Generate the pixel indexing (if parallel and the master process) and - # convert from sparse arrays to pixel, index arrays.if sparse - if count is None: - pixel = None - elif self.sparse: - pixel, count = count.to_arrays() - _, depth = depth.to_arrays() - _, depth_var = depth_var.to_arrays() - else: - pixel = np.arange(len(depth)) - - return pixel, count, depth, depth_var - - -class BrightObjectMapper: - def __init__(self, pixel_scheme, mag_threshold, sparse=False, comm=None): - """Class to build up bright object maps iteratively as we cycle through a data set. - - Two methods should be used: - - add_data which should be called each time a new data chunk is loaded - - finalize, at the end to collect the results - - Parameters - ---------- - pixel_scheme: PixelScheme object - Converter from angle to pixel - - mag_threshold: float - Value of magnitude to use as the cutoff for bright objects - - sparse: bool, optional - Whether to use sparse indexing for the calculation. Faster if only a small number of pixels - Are used. - - comm: MPI communicator, optional - An MPI comm for parallel processing. If None, calculation is serial. - - Returns - ------- - - pixel: array - Indices of all pixels with bright objects - - count: array - Number of bright objects in each pixel - - brmag: array - mean magnitude of bright objects in each pixel - - brmag_var: array - variance of magnitude of bright objects in each pixel - - """ - self.pixel_scheme = pixel_scheme - self.mag_threshold = mag_threshold - self.comm = comm - self.sparse = sparse - self.stats = ParallelMeanVariance(pixel_scheme.npix, sparse=sparse) - - def add_data(self, data): - ra = data["ra"] - dec = data["dec"] - ext = data["extendedness"] - mags = data["mag"] - # Get healpix pixels - pix_nums = self.pixel_scheme.ang2pix(ra, dec) - - # For each found pixel find all values hitting that pixel - # and yield the index and their magnitudes - for p in np.unique(pix_nums): - mask = (pix_nums == p) & (ext == 0) & (mags < self.mag_threshold) - self.stats.add_data(p, mags[mask]) - - def finalize(self, comm=None): - - count, brmag, brmag_var = self.stats.collect(comm) - - # Generate the pixel indexing (if parallel and the master process) and - # convert from sparse arrays to pixel, index arrays.if sparse - if count is None: - pixel = None - elif self.sparse: - pixel, count = count.to_arrays() - _, brmag = brmag.to_arrays() - _, brmag_var = brmag_var.to_arrays() - else: - pixel = np.arange(len(brmag)) - - return pixel, count, brmag, brmag_var +def make_dask_bright_object_map(ra, dec, mag, extended, threshold, pixel_scheme): + """ + Create a map of bright objects using Dask. + + Parameters + ---------- + ra : dask.array + Right ascension values of the objects. + dec : dask.array + Declination values of the objects. + mag : dask.array + Magnitude values of the objects. + extended : dask.array + Extendedness flag of the objects (0 for point sources, 1 for extended sources). + threshold : float + Magnitude threshold to classify objects as bright. + pixel_scheme : PixelScheme + Pixelization scheme object with methods `npix` and `ang2pix`. + + Returns + ------- + tuple + A tuple containing: + - pix (dask.array): Unique pixel indices containing bright objects. + - bright_object_count_map (dask.array): Count map of bright objects per pixel. + """ + _, da = import_dask() + npix = pixel_scheme.npix + pix = pixel_scheme.ang2pix(ra, dec) + bright = da.where((mag < threshold) & (extended == 0), 1, 0) + bright_object_count_map = da.bincount(pix, weights=bright, minlength=npix).astype(int) + pix = da.unique(pix) + return pix, bright_object_count_map + +def make_dask_depth_map(ra, dec, mag, snr, threshold, delta, pixel_scheme): + """ + Generate a depth map using Dask, by finding the mean magnitude of + objects with a signal-to-noise ratio close to a given threshold. + + Parameters + ---------- + ra : dask.array + Right Ascension coordinates in degrees. + dec : dask.array + Declination coordinates in degrees. + mag : dask.array + Magnitudes of the objects, in band of user's choice + snr : dask.array + Signal-to-noise ratios of the objects, in the same band. + threshold : float + Threshold value for signal-to-noise ratio. + delta : float + Tolerance value for signal-to-noise ratio. + pixel_scheme : PixelScheme + An object that provides pixelization scheme with methods `npix` and `ang2pix`. + + Returns + ------- + tuple + A tuple containing: + - pix (dask.array): Unique pixel indices. + - count_map (dask.array): Count of objects per pixel. + - depth_map (dask.array): Mean depth per pixel. + - depth_var (dask.array): Variance of depth per pixel. + """ + _, da = import_dask() + npix = pixel_scheme.npix + pix = pixel_scheme.ang2pix(ra, dec) + hit = da.where(abs(snr - threshold) < delta, 1, 0) + depth = da.where(abs(snr - threshold) < delta, mag, 0) + + # get the count and sum of the depth and depth^2 + count_map = da.bincount(pix, weights=hit, minlength=npix) + depth_map = da.bincount(pix, weights=depth, minlength=npix) + depth2_map = da.bincount(pix, weights=depth**2, minlength=npix) + + # convert to means + depth_map /= count_map + depth2_map /= count_map + + # get the variance from the mean depth + depth_var = depth2_map - depth_map**2 + + pix = da.unique(pix) + return pix, count_map, depth_map, depth_var \ No newline at end of file diff --git a/txpipe/maps.py b/txpipe/maps.py index aa246e859..50b0cdbd4 100644 --- a/txpipe/maps.py +++ b/txpipe/maps.py @@ -1,10 +1,8 @@ from .base_stage import PipelineStage -from .data_types import TomographyCatalog, MapsFile, HDFFile, ShearCatalog +from .data_types import TomographyCatalog, MapsFile, HDFFile import numpy as np -from .utils import unique_list, choose_pixelization, rename_iterated -from .utils.calibration_tools import read_shear_catalog_type -from .utils.calibrators import Calibrator -from .mapping import ShearMapper, LensMapper, FlagMapper +from .utils import unique_list, choose_pixelization, import_dask +from .mapping import make_dask_shear_maps, make_dask_lens_maps SHEAR_SHEAR = 0 @@ -167,8 +165,7 @@ class TXSourceMaps(PipelineStage): } def run(self): - import dask - import dask.array as da + dask, da = import_dask() import healpy # Configuration options @@ -191,60 +188,19 @@ def run(self): for i in bins: # These don't actually load all the data - everything is lazy ra = da.from_array(f[f"shear/bin_{i}/ra"], block_size) + block_size = ra.chunksize dec = da.from_array(f[f"shear/bin_{i}/dec"], block_size) g1 = da.from_array(f[f"shear/bin_{i}/g1"], block_size) g2 = da.from_array(f[f"shear/bin_{i}/g2"], block_size) weight = da.from_array(f[f"shear/bin_{i}/weight"], block_size) - # This seems to work directly, but we should check performance - pix = pixel_scheme.ang2pix(ra, dec) - - # count map is just the number of galaxies per pixel - count_map = da.bincount(pix, minlength=npix) - - # For the other map we use bincount with weights - these are the - # various maps by pixel. bincount gives the number of objects in each - # vaue of the first argument, weighted by the weights keyword, so effectively - # it gives us - # p_i = sum_{j} x[j] * delta_{pix[j], i} - # which is out map - weight_map = da.bincount(pix, weights=weight, minlength=npix) - g1_map = da.bincount(pix, weights=weight * g1, minlength=npix) - g2_map = da.bincount(pix, weights=weight * g2, minlength=npix) - esq_map = da.bincount(pix, weights=weight**2 * 0.5 * (g1**2 + g2**2), minlength=npix) - - # normalize by weights to get the mean map value in each pixel - g1_map /= weight_map - g2_map /= weight_map - - # Generate a catalog-like vector of the means so we can - # subtract from the full catalog. Not sure if this ever actually gets - # created, or if dask just keeps a conceptual reference to it. - g1_mean = g1_map[pix] - g2_mean = g2_map[pix] - - # Also generate variance maps - var1_map = da.bincount(pix, weights=weight * (g1 - g1_mean)**2, minlength=npix) - var2_map = da.bincount(pix, weights=weight * (g2 - g2_mean)**2, minlength=npix) - - # we want the variance on the mean, so we divide by both the weight - # (to go from the sum to the variance) and then by the count (to get the - # variance on the mean). Have verified that this is the same as using - # var() on the original arrays. - var1_map /= (weight_map * count_map) - var2_map /= (weight_map * count_map) - + count_map, g1_map, g2_map, weight_map, esq_map, var1_map, var2_map = make_dask_shear_maps( + ra, dec, g1, g2, weight, pixel_scheme) + # slight change in output name if i == "all": i = "2D" - # replace nans with UNSEEN. The NaNs can occur if there are no objects - # in a pixel, so the value is undefined. - g1_map[da.isnan(g1_map)] = healpy.UNSEEN - g2_map[da.isnan(g2_map)] = healpy.UNSEEN - var1_map[da.isnan(var1_map)] = healpy.UNSEEN - var2_map[da.isnan(var2_map)] = healpy.UNSEEN - esq_map[da.isnan(esq_map)] = healpy.UNSEEN # Save all the stuff we want here. output[f"count_{i}"] = count_map @@ -277,6 +233,7 @@ def run(self): } metadata['nbin'] = nbin metadata['nbin_source'] = nbin + metadata.update(pixel_scheme.metadata) pix = np.where(output["mask"])[0] @@ -295,8 +252,7 @@ def run(self): out.file['maps'].attrs.update(metadata) - -class TXLensMaps(TXBaseMaps): +class TXLensMaps(PipelineStage): """ Make tomographic lens number count maps @@ -306,69 +262,84 @@ class TXLensMaps(TXBaseMaps): """ name = "TXLensMaps" + dask_parallel = True inputs = [ ("photometry_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ] - outputs = [ ("lens_maps", MapsFile), ] - config_options = {**map_config_options} + config_options = { + "block_size": 0, + **map_config_options + } - def prepare_mappers(self, pixel_scheme): - # read nbin_lens and save - with self.open_input("lens_tomography_catalog") as f: - nbin_lens = f["tomography"].attrs["nbin"] - self.config["nbin_lens"] = nbin_lens + def ra_dec_inputs(self): + return "photometry_catalog", "photometry" - # create lone mapper - lens_bins = list(range(nbin_lens)) - source_bins = [] - mapper = LensMapper( - pixel_scheme, - lens_bins, - sparse=self.config["sparse"], - ) - return [mapper] - def data_iterator(self): - # see TXSourceMaps abov for info on this - return self.combined_iterators( - self.config["chunk_rows"], - # first file - "photometry_catalog", - "photometry", - ["ra", "dec"], - # next file - "lens_tomography_catalog", - "tomography", - ["bin", "lens_weight"], - ) + def run(self): + import healpy + _, da = import_dask() - def accumulate_maps(self, pixel_scheme, data, mappers): - # no need to rename cols here since just ra, dec, lens_bin - mapper = mappers[0] - mapper.add_data(data) - def finalize_mappers(self, pixel_scheme, mappers): - # Again just the one mapper - mapper = mappers[0] - # Ignored return values are empty dicts for shear - pix, ngal, weighted_ngal = mapper.finalize(self.comm) + + # The subclass reads the ra and dec of the lenses + # from a different input file, so we allow that here + # using this method which is overwrites + cat_name, cat_group = self.ra_dec_inputs() + + # open our two input files. They will be read lazily + tomo_cat = self.open_input("lens_tomography_catalog", wrapper=True) + photo_cat = self.open_input(cat_name, wrapper=True) + + nbin_lens = tomo_cat.read_nbin() + self.config["nbin_lens"] = nbin_lens + + + # Other config info. + pixel_scheme = choose_pixelization(**self.config) + block_size = self.config["block_size"] + if block_size == 0: + block_size = "auto" + + # Lazily open input data sets + ra = da.from_array(photo_cat.file[f"{cat_group}/ra"], block_size) + block_size = ra.chunksize + dec = da.from_array(photo_cat.file[f"{cat_group}/dec"], block_size) + weight = da.from_array(tomo_cat.file["tomography/lens_weight"], block_size) + tomo_bin = da.from_array(tomo_cat.file["tomography/bin"], block_size) + + # bins to generate maps for + bins = list(range(nbin_lens)) + ["2D"] maps = {} - if self.rank != 0: - return maps + # Generate the maps with dask. This is lazy until we do da.compute + for b in bins: + pix, count_map, weight_map = make_dask_lens_maps(ra, dec, weight, tomo_bin, b, pixel_scheme) + maps[f"ngal_{b}"] = (pix, count_map[pix]) + maps[f"weighted_ngal_{b}"] = (pix, weight_map[pix]) - for b in mapper.bins: - # keys are the output tag and the map name - maps["lens_maps", f"ngal_{b}"] = (pix, ngal[b]) - maps["lens_maps", f"weighted_ngal_{b}"] = (pix, weighted_ngal[b]) + # Actually run everything + maps, = da.compute(maps) - return maps + # collate metadata + metadata = { + key: self.config[key] + for key in map_config_options + } + metadata['nbin'] = nbin_lens + metadata['nbin_lens'] = nbin_lens + metadata.update(pixel_scheme.metadata) + + # Save all the maps + with self.open_output("lens_maps", wrapper=True) as out: + for name, (pix, m) in maps.items(): + out.write_map(name, pix, m, metadata) + out.file['maps'].attrs.update(metadata) class TXExternalLensMaps(TXLensMaps): @@ -386,22 +357,12 @@ class TXExternalLensMaps(TXLensMaps): ("lens_tomography_catalog", TomographyCatalog), ] - config_options = {**map_config_options} + def ra_dec_inputs(self): + return "lens_catalog", "lens" + + + - def data_iterator(self): - # See TXSourceMaps for an explanation of thsis - return self.combined_iterators( - self.config["chunk_rows"], - # first file - "lens_catalog", - "lens", - ["ra", "dec"], - # next file - "lens_tomography_catalog", - "tomography", - ["bin", "lens_weight"], - # another section in the same file - ) class TXDensityMaps(PipelineStage): diff --git a/txpipe/plotting/correlations.py b/txpipe/plotting/correlations.py index 115b4ceed..d076a6817 100755 --- a/txpipe/plotting/correlations.py +++ b/txpipe/plotting/correlations.py @@ -269,7 +269,7 @@ def make_plot(corr, obs_data, obs_theory, fig=None, xlogscale=True, ratios=False a.errorbar( theta, xi / xi_th, - err / xi_th, + err / abs(xi_th), fmt=".", label="TXPipe Data/CCL Theory", ) diff --git a/txpipe/test/test_mapper.py b/txpipe/test/test_mapper.py deleted file mode 100644 index 35c49f71c..000000000 --- a/txpipe/test/test_mapper.py +++ /dev/null @@ -1,58 +0,0 @@ -import numpy as np -from ..mapping import ShearMapper -from ..utils import choose_pixelization -import healpy - - -def test_mapper(): - # 12 pixels - nside = 1 - scheme = choose_pixelization(pixelization="healpix", nside=nside) - mapper = ShearMapper(scheme, [0], [0]) - npix = healpy.nside2npix(nside) - pix = np.arange(npix) - ra, dec = healpy.pix2ang(nside, pix, lonlat=True) - - N = 5 - for i in range(N): - data = { - "ra": ra, - "dec": dec, - "bin": np.zeros(npix, dtype=float), - "weight": np.ones(npix, dtype=float), - "g1": pix.astype(float) * 2, - "g2": np.ones(npix) * i, - } - # same data multiple times so we can - # get variances - mapper.add_data(data) - - ( - pixel, - ngal, - g1, - g2, - var_g1, - var_g2, - source_weight, - esq, - ) = mapper.finalize() - - mu_2 = (N - 1) / 2 - # variance - var_2 = (N - 1) * (2 * N - 1) / 6 - mu_2**2 - # variance on mean - var_2 /= N - - # should see all pixels - assert np.allclose(pixel, pix) - # all lens pixels hit once - assert np.all(ngal[0] == 5) - # averaged down - assert np.allclose(g1[0], pix * 2) - assert np.allclose(g2[0], mu_2) - # no variance for g1, since constant - # for - assert np.allclose(var_g1[0], 0.0) - assert np.allclose(var_g2[0], var_2) - assert np.allclose(source_weight[0], 5) diff --git a/txpipe/theory.py b/txpipe/theory.py index e315f565e..98d06a334 100644 --- a/txpipe/theory.py +++ b/txpipe/theory.py @@ -21,6 +21,7 @@ class TXTwoPointTheoryReal(PipelineStage): config_options = { "galaxy_bias": [0.0], + "smooth": False, } def run(self): @@ -51,7 +52,7 @@ def run(self): bias = None elif len(bias) == 1 and bias[0] < 0: bias = -bias[0] - s_theory = theory_3x2pt(cosmo, s, bias=bias) + s_theory = theory_3x2pt(cosmo, s, bias=bias, smooth=self.config["smooth"]) # Remove covariance s_theory.covariance = None @@ -79,6 +80,7 @@ class TXTwoPointTheoryFourier(TXTwoPointTheoryReal): config_options = { "galaxy_bias": [0.0], + "smooth": False, } def run(self): @@ -95,7 +97,7 @@ def run(self): ) print(cosmo) - s_theory = theory_3x2pt(cosmo, s) + s_theory = theory_3x2pt(cosmo, s, smooth=self.config["smooth"]) # Remove covariance s_theory.covariance = None diff --git a/txpipe/twopoint_fourier.py b/txpipe/twopoint_fourier.py index bff271861..aef0e17a4 100755 --- a/txpipe/twopoint_fourier.py +++ b/txpipe/twopoint_fourier.py @@ -323,16 +323,20 @@ def load_maps(self): density_fields = [ (nmt.NmtField(clustering_weight, [d], n_iter=0, lmax=lmax1)) for d in d_maps ] + print("Generated density fields") else: density_fields = [] + print("Skipping density fields") if self.config["do_shear_pos"] or self.config["do_shear_shear"]: lensing_fields = [ (nmt.NmtField(lw, [g1, g2], n_iter=0, lmax=lmax1)) for (lw, g1, g2) in zip(lensing_weights, g1_maps, g2_maps) ] + print("Generated lensing fields") else: lensing_fields = [] + print("Skipping lensing fields") # Collect together all the maps we will output maps = { @@ -651,7 +655,7 @@ def window_pixel(ell, nside): # Save all the results, skipping things we don't want like EB modes for index, name in results_to_use: - win = bandpowers[index, :, index, :] + win = bandpowers[index, :, index, :].copy() self.results.append( Measurement( name, diff --git a/txpipe/utils/__init__.py b/txpipe/utils/__init__.py index 5a81c4262..6f02561ce 100755 --- a/txpipe/utils/__init__.py +++ b/txpipe/utils/__init__.py @@ -14,4 +14,5 @@ from .calibration_tools import read_shear_catalog_type, band_variants, metacal_variants, metadetect_variants from .calibration_tools import MetacalCalculator, LensfitCalculator, MeanShearInBins from .conversion import nanojansky_err_to_mag_ab, nanojansky_to_mag_ab, moments_to_shear, mag_ab_to_nanojansky -from .timer import Timer \ No newline at end of file +from .timer import Timer +from .debuggable_dask import import_dask diff --git a/txpipe/utils/calibration_tools.py b/txpipe/utils/calibration_tools.py index 9da47890b..5d0f8fff4 100755 --- a/txpipe/utils/calibration_tools.py +++ b/txpipe/utils/calibration_tools.py @@ -172,7 +172,7 @@ def __init__(self, selector, delta_gamma, resp_mean_diag=False): self.selector = selector self.count = 0 self.sum_weights = 0 - self.sum_weights_sq = 0 + self.sum_sq_weights = 0 self.delta_gamma = delta_gamma self.resp_mean_diag = resp_mean_diag self.cal_bias_means = ParallelMean(size=4) @@ -224,7 +224,7 @@ def add_data(self, data, *args, **kwargs): # Record the count for this chunk, for summation later self.count += n self.sum_weights += np.sum(weight[sel_00]) - self.sum_weights_sq += np.sum(weight[sel_00]**2) + self.sum_sq_weights += np.sum(weight[sel_00]**2) # This is the estimator response, correcting bias of the shear estimator itself @@ -278,21 +278,24 @@ def collect(self, comm=None, allgather=False): Estimator response matrix S: 2x2 array Selection bias matrix + + Neff: float + Sum(weights)**2 / Sum(weights**2) """ # collect all the things we need if comm is not None: if allgather: count = comm.allreduce(self.count) sum_weights = comm.allreduce(self.sum_weights) - sum_weights_sq = comm.allreduce(self.sum_weights_sq) + sum_sq_weights = comm.allreduce(self.sum_sq_weights) else: count = comm.reduce(self.count) sum_weights = comm.reduce(self.sum_weights) - sum_weights_sq = comm.reduce(self.sum_weights_sq) + sum_sq_weights = comm.reduce(self.sum_sq_weights) else: count = self.count sum_weights = self.sum_weights - sum_weights_sq = self.sum_weights_sq + sum_sq_weights = self.sum_sq_weights # Collect the mean values we need mode = "allgather" if allgather else "gather" @@ -317,6 +320,11 @@ def collect(self, comm=None, allgather=False): S_mean[1, 1] = S[6] - S[7] S_mean /= self.delta_gamma + if sum_weights is None: + Neff = None + else: + Neff = sum_weights**2/sum_sq_weights + if self.resp_mean_diag: # Sets response to scalar R[0,0]==R[1,1] = (R[0,0]+R[1,1])/2 and nulls the off-diagonal elements (used in DES-Y3) print("Setting R[0,0]==R[1,1] = (R[0,0]+R[1,1])/2") @@ -328,7 +336,7 @@ def collect(self, comm=None, allgather=False): S_mean[1,0]=S_mean[0,1]=0 S_mean[0,0]=S_mean[1,1]=Savg - return R_mean, S_mean, count, sum_weights**2/sum_weights_sq + return R_mean, S_mean, count, Neff class MetaDetectCalculator: @@ -349,7 +357,7 @@ def __init__(self, selector, delta_gamma): self.mean_e = ParallelMean(size=10) self.counts = np.zeros(5, dtype=int) self.sum_weights = np.zeros(5, dtype=int) - self.sum_weights_sq = np.zeros(5, dtype=int) + self.sum_sq_weights = np.zeros(5, dtype=int) def add_data(self, data, *args, **kwargs): @@ -381,7 +389,7 @@ def add_data(self, data, *args, **kwargs): self.mean_e.add_data(2 * i + 1, g2, w) self.counts[i] += w.size self.sum_weights[i] += np.sum(w) - self.sum_weights_sq[i] += np.sum(w**2) + self.sum_sq_weights[i] += np.sum(w**2) return sel_00 @@ -407,16 +415,19 @@ def collect(self, comm=None, allgather=False): if allgather: counts = comm.allreduce(self.counts) sum_weights = comm.allreduce(self.sum_weights) - sum_weights_sq = comm.allreduce(self.sum_weights_sq) + sum_sq_weights = comm.allreduce(self.sum_sq_weights) else: counts = comm.reduce(self.counts) sum_weights = comm.reduce(self.sum_weights) - sum_weights_sq = comm.reduce(self.sum_weights_sq) + sum_sq_weights = comm.reduce(self.sum_sq_weights) + + if comm.rank > 0: + return None, None, None else: counts = self.counts sum_weights = self.sum_weights - sum_weights_sq = self.sum_weights_sq + sum_sq_weights = self.sum_sq_weights # The ordering of these arrays is, from above: # 0: g1 (not actually used here) @@ -429,7 +440,7 @@ def collect(self, comm=None, allgather=False): # 7: g2_2p # 8: g1_2m # 9: g2_2m - + # Compute the mean R components R = np.zeros((2, 2)) R[0, 0] = mean_e[2] - mean_e[4] # g1_1p - g1_1m @@ -439,7 +450,7 @@ def collect(self, comm=None, allgather=False): R /= self.delta_gamma # we just want the count of the 00 base catalog - return R, counts[0], sum_weights[0]**2/sum_weights_sq[0] + return R, counts[0], sum_weights[0]**2/sum_sq_weights[0] class LensfitCalculator: @@ -477,6 +488,8 @@ def __init__(self, selector, dec_cut=True, input_m_is_weighted=False): self.C_S = ParallelMean(2) self.count = 0 self.sum_weights = 0 + self.sum_sq_weights = 0 + self.sum_weights_sq = 0 # In KiDS, the additive bias is calculated and removed per North and South field # we have implemented a config to choose whether or not to do this split @@ -514,7 +527,7 @@ def add_data(self, data, *args, **kwargs): # Record the count for this chunk, for summation later self.count += n self.sum_weights += np.sum(w[sel]) - self.sum_weights_sq += np.sum(w[sel]**2) + self.sum_sq_weights += np.sum(w[sel]**2) # Accumulate the calibration quantities so that later we # can compute the weighted mean of the values @@ -562,6 +575,9 @@ def collect(self, comm=None, allgather=False): C: float array c1, c2 additive biases (weighted average of g1 and g2) + Neff: float + Sum(weights)**2 / Sum(weights**2) + """ # The total number of objects is just the # number from all the processes summed together. @@ -569,16 +585,16 @@ def collect(self, comm=None, allgather=False): if allgather: count = comm.allreduce(self.count) sum_weights = comm.allreduce(self.sum_weights) - sum_weights_sq = comm.allreduce(self.sum_weights_sq) + sum_sq_weights = comm.allreduce(self.sum_sq_weights) else: count = comm.reduce(self.count) sum_weights = comm.reduce(self.sum_weights) - sum_weights_sq = comm.reduce(self.sum_weights_sq) + sum_sq_weights = comm.reduce(self.sum_sq_weights) else: count = self.count sum_weights = self.sum_weights - sum_weights_sq = self.sum_weights_sq + sum_sq_weights = self.sum_sq_weights # Collect the weighted means of these numbers. # this collects all the values from the different @@ -588,7 +604,12 @@ def collect(self, comm=None, allgather=False): _, C_N = self.C_N.collect(comm, mode) _, C_S = self.C_S.collect(comm, mode) - return K, C_N, C_S, count, sum_weights**2/sum_weights_sq + if sum_weights is None: + Neff = None + else: + Neff = sum_weights**2/sum_sq_weights + + return K, C_N, C_S, count, Neff class HSCCalculator: @@ -624,7 +645,7 @@ def __init__(self, selector): self.R = ParallelMean(1) self.count = 0 self.sum_weights = 0 - self.sum_weights_sq = 0 + self.sum_sq_weights = 0 def add_data(self, data, *args, **kwargs): @@ -655,7 +676,7 @@ def add_data(self, data, *args, **kwargs): n = w[sel].size self.count += n self.sum_weights += np.sum(w[sel]) - self.sum_weights_sq += np.sum(w[sel]**2) + self.sum_sq_weights += np.sum(w[sel]**2) w = w[sel] @@ -698,23 +719,29 @@ def collect(self, comm=None, allgather=False): if allgather: count = comm.allreduce(self.count) sum_weights = comm.allreduce(self.sum_weights) - sum_weights_sq = comm.allreduce(self.sum_weights_sq) + sum_sq_weights = comm.allreduce(self.sum_sq_weights) else: count = comm.reduce(self.count) sum_weights = comm.reduce(self.sum_weights) - sum_weights_sq = comm.reduce(self.sum_weights_sq) + sum_sq_weights = comm.reduce(self.sum_sq_weights) else: count = self.count sum_weights = self.sum_weights - sum_weights_sq = self.sum_weights_sq + sum_sq_weights = self.sum_sq_weights # Collect the weighted means of these numbers. # this collects all the values from the different # processes and over all the chunks of data mode = "allgather" if allgather else "gather" _, R = self.R.collect(comm, mode) _, K = self.K.collect(comm, mode) - return R, K, count, sum_weights**2/sum_weights_sq + + if sum_weights is None: + Neff = None + else: + Neff = sum_weights**2/sum_sq_weights + + return R, K, count, Neff class MeanShearInBins: diff --git a/txpipe/utils/debuggable_dask.py b/txpipe/utils/debuggable_dask.py new file mode 100644 index 000000000..b6dad2098 --- /dev/null +++ b/txpipe/utils/debuggable_dask.py @@ -0,0 +1,40 @@ +import numpy as np +import os + +def from_array(arr, block_size): + return np.array(arr) + +def compute(anything): + return anything, + +def import_dask(actually_numpy=False): + """ + Import dask, but if actually_numpy is True, or if the TX_DASK_DEBUG + environment variable is set, return numpy instead. + + This is to help with debugging dask by replacing it temporarily with + numpy, which is easier to debug. + + Parameters + ---------- + actually_numpy : bool + If True, return numpy instead of dask. + + Returns + ------- + dask : module + The dask module or numpy module. + da : module + The dask.array module or numpy module. + + """ + actually_numpy = actually_numpy or os.environ.get("TX_DASK_DEBUG", "") + if actually_numpy: + print("Faking dask with numpy") + np.from_array = from_array + np.compute = compute + return np, np + else: + import dask + import dask.array as da + return dask, da diff --git a/txpipe/utils/fitting.py b/txpipe/utils/fitting.py index e537657a2..fd9136a6e 100755 --- a/txpipe/utils/fitting.py +++ b/txpipe/utils/fitting.py @@ -1,7 +1,7 @@ from scipy.optimize import curve_fit import numpy as np - +import warnings def fit_straight_line(x, y, y_err=None): """ @@ -25,6 +25,13 @@ def fit_straight_line(x, y, y_err=None): c: float intercept """ + if x.size == 0: + print("ERROR: No data for straight line fit. Returning m=0 c=0") + return 0.0, 0.0, np.array([[1.0, 0.0], [0.0, 1.0]]) + + if x.size != y.size: + raise ValueError("x and y must have the same length") + try: popt, cov = curve_fit(line, x, y, sigma=y_err) except RuntimeError: diff --git a/txpipe/utils/patches.py b/txpipe/utils/patches.py index 493cb5457..80f2935bd 100644 --- a/txpipe/utils/patches.py +++ b/txpipe/utils/patches.py @@ -54,6 +54,9 @@ def __init__( # Different self.columns = columns + if initial_size <= 0: + raise ValueError("Error in patch creation - patch initial size must be > 0") + # This is used to work out the nearest patch center to each galaxy self.ball = sklearn.neighbors.BallTree(patch_centers) @@ -284,6 +287,9 @@ def run(cls, cat, chunk_rows, comm=None): initial_size = max_size // npatch patch_centers = cat.patch_centers + if initial_size == 0: + initial_size = 2 + # make the patchmaker object patchmaker = cls( patch_filenames, diff --git a/txpipe/utils/theory_model.py b/txpipe/utils/theory_model.py index 0bc19ea0b..feeb10076 100644 --- a/txpipe/utils/theory_model.py +++ b/txpipe/utils/theory_model.py @@ -46,7 +46,7 @@ def build_likelihood(build_parameters): bias_systematic = nc.LinearBiasSystematic(sacc_tracer=tracer_name) tr = nc.NumberCounts(sacc_tracer=tracer_name, systematics=[bias_systematic]) else: - raise ValueError(f"Unknown tracer in sacc file, non-3x2pt: {tracer}") + raise ValueError(f"Unknown tracer in sacc file, non-3x2pt: {tracer_name}") sources[tracer_name] = tr # Now that we have all sources we can instantiate all the two-point