From de3173805cf3510d2708efe89d9493426ab5e8bd Mon Sep 17 00:00:00 2001 From: GintasS Date: Tue, 9 Apr 2024 17:23:49 +0200 Subject: [PATCH 1/6] Improved readme with instructions. --- README.md | 319 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 229 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index 5e3f45b..b875917 100755 --- a/README.md +++ b/README.md @@ -1,180 +1,319 @@ -# ntuple-analysis +# ntuple-analysis PYTHON framework for the analysis of [ROOT](https://root.cern/) `TTree` data using [uproot](https://uproot.readthedocs.io/en/latest/) for the IO and [awkward-array](https://awkward-array.org/doc/main/) for the columnar data analysis. -The tool is developed for the analysis of [FastPUPPI](https://github.com/p2l1pfp/FastPUPPI) but should work with any kind of flat ntuples. +The tool was developed for the analysis of [FastPUPPI](https://github.com/p2l1pfp/FastPUPPI) but should work with any kind of flat ntuple. -## Pre-requisites: first time setup +## In this README -The tool can be run on any private machines using just `python`, `pip` and `virtualenvwrapper`. -If you plan to run it on lxplus you might want to look at the point `1` below. +- [Features](#features) +- [Requirements](#requirements) +- [Usage](#usage) + - [First time setup](#first-time-setup) + - [After the first setup](#after-the-first-setup) + - [Running the project on Windows](#running-the-project-on-windows) +- [Main script](#main-script) + - [How does the analyzeNtuples work?](#how-does-analyzentuplespy-work) + - [Configuration files for the analyzeNtuples script](#configuration-files-for-the-analyzentuplespy-script) +- [Submitting to the batch system](#submitting-to-the-batch-system) +- [FAQ](#faq) +- [Contributing](#contributing) -### 1. lxplus setup +## Features -This step is `lxplus` specific, givin access to a more recent `python` and `root` version. -Edit/skip it accordingly for your specific system. +The features -`source setup_lxplus.sh` + - To be added -### 2. install `virtualenvwrapper` +## Requirements -This stetp needs to be done **only once** for your account and can be done with whatever `python` version is in use in the system. +- A computing account on the LXPLUS CERN service. -For some reason the current `CMSSW` scrips seems to deliver an inconsistent setup of `virtualenv` and `virtualenvwrapper`, for this reason we force a new installation in `~/.local` using: +## Usage -`pip install --ignore-installed --user virtualenv==15.1.0 virtualenvwrapper` +### First time setup -For a more complete overview of the procedure you can refer to -`virtualenvwrapper` [installation instructions](https://virtualenvwrapper.readthedocs.io/en/latest/install.html) +1. Log in to a LXPLUS machine. -### 3. setup `virtualenvwrapper` + *A computing account at CERN is required.* + + *This step is LXPLUS specific, giving access to a more recent Python and Root version. Edit/skip it accordingly for your specific system.* -For starting using virtualenvwrapper + ``` + ssh lxplus.cern.ch + ``` + + *After that, create a new directory to store the ntuple-analysis project:* -`source setVirtualEnvWrapper.sh` + ``` + mkdir "your directory" + ``` +2. Clone this repository. -### 4. create a virtualenv for the project + ``` + git clone https://github.com/cerminar/ntuple-analysis.git + ``` + + *Or you can create a fork of this repository if you plan to contribute.* -The **first time** you will have to create the actual instance of the `virtualenv`: +3. CD into your newly created directory: + ``` + cd "your directory" + ``` -``mkvirtualenv --system-site-packages - -p `which python3.9` -r requirements.txt `` + *Again, perform CD into the ntuple-analysis:* + ``` + cd ntuple-analysis + ``` + +4. Clone [Plot Drawing Tools repository](https://github.com/cerminar/plot-drawing-tools) for the Jupyter Notebook support. -[requirements.txt](requirements.txt) + ``` + git clone https://github.com/cerminar/plot-drawing-tools.git + ``` -You can use the file directly using for example: +5. Edit the ```setVirtualEnvWrapper.sh``` script to add the HOME directory of your user. -`pip install -r requirements.txt` + *You can use Nano, Vim or your other favorite editor.* + ``` + vim setVirtualEnvWrapper.sh + ``` -*NOTE*: `python > 3.9` is a requirement. + *Edit the first line of the ```setVirtualEnvWrapper.sh``` script:* + ``` + export WORKON_HOME=/data/YOUR_USERNAME/.virtualenvs + ``` + *where YOUR_USERNAME is your username.* -## Setup after first installation +6. Run the shell script ```setup_lxplus.sh``` to set-up the LXPLUS service. -### 1. lxplus setup + ``` + source setup_lxplus.sh + ``` -This step is `lxplus` specific, givin access to a more recent `python` and `root` version. -Edit/skip it accordingly for your specific system. +7. Run another shell script, ```setVirtualEnvWrapper.sh```, to initialize the virtual environment wrapper. -`source setup_lxplus.sh` + ``` + source setVirtualEnvWrapper.sh + ``` -### 2. setup `virtualenvwrapper` + *To learn more about the Virtual Environment Wrapper, you can take a look at the docs [link](https://virtualenvwrapper.readthedocs.io/en/latest/install.html).* + +8. Create a virtual environment for the project. -For starting using virtualenvwrapper + ``` + mkvirtualenv --system-site-packages -p `which python3.9` -r requirements.txt + ``` + *where venvname is the name of your new virtual environment* -`source setVirtualEnvWrapper.sh` + *If you created a virtual environment in a different way, you can use:* + ``` + pip install -r requirements.txt + ``` + *NOTE: python > 3.9 is a requirement.** + +9. Activate the virtual environment (if it's not active already). -### 3. activate the `virtualenv` + ``` + workon + ``` + *where venvname is the name of your new virtual environment** -After this initial (once in a time) setup is done you can just activate the virtualenv calling: +10. In order to use Jupyter Notebooks, we need to reinstall the ```traitlets``` package. + + ``` + pip uninstall traitlets + ``` -`workon ` + *and then* + ``` + pip install traitlets==5.9.0 + ``` -(`lsvirtualenv` is your friend in case you forgot the name). +11. Install a custom kernel with all of the packages from your virtual environment. + ``` + python3 -m ipykernel install --name --user + ``` + *where venvname is the name of your new virtual environment** -### Conda environment -You can use also conda to install all the dependencies and root + *Source: [here](https://stackoverflow.com/questions/28831854/how-do-i-add-python3-kernel-to-jupyter-ipython ))* -```bash -conda create env_name python=3.11 -conda activate env_name -conda install root #In the conda-forge channel -pip install -r requirements.txt -``` +12. Launch the Jupyter Notebook. + + *You can launch it in the LXPLUS service:* + ``` + jupyter notebook + ``` + *or, if you are using Windows, to access it from Windows:* + ``` + jupyter notebook --no-browser --port=8095 + ``` + +### After the first setup +1. Log in to a LXPLUS machine. -## Running the analysis + *A computing account at CERN is required.* -The main script is `analyzeNtuples.py`: + ``` + ssh lxplus.cern.ch + ``` + + *CD into the root directory of the ntuple-analysis* + ``` + cd "your directory" + ``` +2. Run the shell script ```setup_lxplus.sh``` to set up the LXPLUS service. -`python analyzeNtuples.py --help` + ``` + source setup_lxplus.sh + ``` -An example of how to run it: +3. Run another shell script ```setVirtualEnvWrapper.sh``` to initialize virtual environment wrapper. + + ``` + source setVirtualEnvWrapper.sh + ``` + + *To learn more about the Virtual Environment Wrapper, you can take a look at the docs [link](https://virtualenvwrapper.readthedocs.io/en/latest/install.html).* + +4. Activate the virtual environment (if it's not active already). + + ``` + workon + ``` + *where venvname is the name of your existing virtual environment from the first set-up.* + + *also, ```lsvirtualenv``` is your friend if you forget the name of the virtualenv.* + +### Running the Jupyter Notebook on Windows + +You need to do the following: + +1. Download [Ubuntu](https://apps.microsoft.com/detail/9pdxgncfsczv?hl=en-us&gl=US) for Windows here and install it. +2. Launch the first instance of Ubuntu (referred to here as #1) and complete all the steps on that machine from [here](#first-time-setup). +3. Launch another instance of Ubuntu (referred to here as #2) and create a tunnel between instance #1 and your Windows machine by: + ``` + ssh -L 8099:localhost:8095 YOUR_MACHINE_URL + ``` + *YOUR_MACHINE_URL is the URL of the address to connect to your machine.* + + `NOTE:` If you are going to use the LXPLUS service, it might happen that this tunnel will not be created with the machine that launched Jupyter Notebook with the following: + + ``` + jupyter notebook --no-browser --port=8095 + ``` + + It means that you need a dedicated CentOS machine. +4. Open http://localhost:8099/ in the browser on Windows. + + `NOTE:` When in Jupyter Notebook, it is important to select a kernel that you have created with the python3 -m ipykernel install command on step 10. + + +## Main script + +The main script is ```analyzeNtuples.py```: + +``` +python analyzeNtuples.py --help +``` -`python analyzeNtuples.py -f cfg/hgctps.yaml -i cfg/datasets/ntp_v81.yaml -c tps -s doubleele_flat1to100_PU200 -n 1000 -d 0` +An example of how to run it: +``` +python analyzeNtuples.py -f cfg/hgctps.yaml -i cfg/datasets/ntp_v81.yaml -c tps -s doubleele_flat1to100_PU200 -n 1000 -d 0 +``` -## General idea +### How does analyzeNtuples.py work? -Data are read in `collections` of objects corresponding to an `array` and are processed by `plotters` which creates set of histograms for different `selections` of the data `collections`. +Data are read in `collections` of objects corresponding to an `array` and are +processed by `plotters`, which create sets of histograms for different `selections` of the data `collections`. +### Configuration files for the analyzeNtuples.py script -### Configuration file -The configuration is handled by 2 yaml files. +The configuration is handled by two YAML files. -One specifying +The first YAML (e.g: ```hgctps.yaml```) file specifies: - output directories - versioning of the plots - - collections of samples, i.e. group of samples to be processed homogeneously: for each collection the list of plotters (see below) to be run is provided. + - collections of samples, i.e., groups of samples to be processed homogeneously: + - for each collection, the list of plotters (see below) to be run is provided. -The other prividing +The second YAML file (e.g., ```ntp_v81.yaml```) provides: - details of the input samples (location of the ntuple files) -Example of configuration file can be found in: +An example of the YAML configuration files can be found here: - [cfg/egplots.yaml](cfg/egplots.yaml) - [cfg/datasets/ntp_v92.yaml](cfg/datasets/ntp_v92.yaml) +## Submitting to the batch system + +Note that the script ```analyzeNtuples.py``` can be used to submit the jobs to the HTCondor batch system ,invoking the `-b` option. +A dag configuration is created, and you can actually submit it following the script output. + +### Note about the HADD job. + +For each sample injected into the batch system, a DAG is created. The DAG will submit a hadd command once all the jobs succeed. However, if you don't want to wait (or don't care), you can also submit a condor job that will run periodically, thus dramatically reducing the latency. For example: + +```condor_submit batch_single_empart_guns_tracks_v77/ele_flat2to100_PU0/batch_harvest.sub``` + + +## FAQ -### Reading ntuple branches or creating derived ones +#### - How can you read ntuple branches or create derived branches? -The list of branches to be read and converted to `Awkward Arrays` format is specified in the module +The list of branches to be read and converted to `Awkward Arrays` format is specified in the module. [collections](python/collections.py) Instantiating an object of class `DFCollection`. What is actually read event by event depends anyhow on which plotters are actually instantiated (collections are read on-demand). -### Selecting subsets of object collections +#### - How can you select a subset of an object collection? Selections are defined as strings in the module: [selections](python/selections.py) -Different collections are defined for different objects and/or different purposes. The selections have a `name` whcih is used for the histogram naming (see below). Selections are used by the plotters. -Selections can be combined and retrieved via regular expressions in the configuration of the plotters. +Different collections are defined for different objects and/or different purposes. The selections have a name,whichh is used for the histogram naming (see below). Selections are used by the plotters. Selections can be combined and retrieved via regular expressions in the configuration of the plotters. -### Adding a new plotter -The actual functionality of accessing the objects, filtering them according to the `selections` and filling `histograms` is provided by the plotter classes defined in the module: +### - How can you add a new plotter? + +The actual functionality of accessing the objects, filtering them according to the `selections`, and filling histograms is provided by the plotter classes defined in the module: [plotters](python/plotters.py) -Basic plotters are already available, most likely you just need to instantiate one of them (or a collection of them) using the `DFCollection` instance you are interested in. -Which collection is run for which sample is steered by the configuration file. +Basic plotters are already available; most likely, you just need to instantiate one of them (or a collection of them) using the DFCollection instance you are interested in. Which collection is run for which sample is steered by the configuration file. -The plotters access one or more collections, select them in several different ways, book and fill the histograms (see below). +The plotters access one or more collections, select them in several different ways, book them, and fill in the histograms (see below). -### Adding a new histogram +### - How can you add a new histogram? Histograms are handled in the module: [l1THistos](python/l1THistos.py) -There are different classes of histograms depending on the input object and on the purpose. -To add a new histogram to an existing class it is enough to add it in the corresponding constructor and in the `fill` module. The writing of the histos to files is handled transparently. +There are different classes of histograms depending on the input object and the purpose. To add a new histogram to an existing class, it is enough to add it in the corresponding constructor and in the `fill` module. The writing of the histos to files is handled transparently. The histogram naming follows the convention: `___` This is assumed in all the `plotters` and in the code to actually draw the histograms. +#### Histogram drawing -## Histogram drawing - -Of course you can use your favorite set of tools. I use mine [plot-drawing-tools](https://github.com/cerminar/plot-drawing-tools), which is based on `jupyter notebooks`. - -`cd ntuple-analysis` -`git clone git@github.com:cerminar/plot-drawing-tools.git` -`jupyter-notebook` +Of course, you can use your favorite set of tools. +I use my [plot-drawing-tools](https://github.com/cerminar/plot-drawing-tools), +which is based on `Jupyter notebooks`. -## HELP - -I can't figure out how to do some manipulation using the `awkward array` or `uproot`....you can take a look at examples and play witht the arrays in: -[plot-drawing-tools/blob/master/eventloop-uproot-ak.ipynb](https://github.com/cerminar/plot-drawing-tools/blob/master/eventloop-uproot-ak.ipynb) -## Submitting to the batch system +``` +cd ntuple-analysis +git clone git@github.com:cerminar/plot-drawing-tools.git +jupyter-notebook +``` -Note that the script `analyzeNtuples.py` can be used to submit the jobs to the HTCondor batch system invoking the `-b` option. A dag configuration is created and you can actually submit it following the script output. +## Contributing -### Note about hadd job. -For each sample injected in the batch system a DAG is created. The DAG will submitt an `hadd` command once all the jobs will succeed. -However, if you don't want to wait (or you don't care) you can submit also a condor job that will run hadd periodically thus reducing dramatically the latency. -For example: +If you want to contribute to this project, you are very welcome. Just fork the project, set it up on your own machine, and play with it. If you have any questions, post them on the issues/discussions tab. -`condor_submit batch_single_empart_guns_tracks_v77/ele_flat2to100_PU0/batch_harvest.sub` +Currently, I can't figure out how to do some manipulation using the `awkward array` or `uproot`.You can take a look at examples and play with the arrays in: +[plot-drawing-tools/blob/master/eventloop-uproot-ak.ipynb](https://github.com/cerminar/plot-drawing-tools/blob/master/eventloop-uproot-ak.ipynb) From 4064e5ef78cd78b9228424c739935a594cfcf460 Mon Sep 17 00:00:00 2001 From: GintasS Date: Wed, 24 Apr 2024 16:59:27 +0200 Subject: [PATCH 2/6] Added dask v2 with parallel dask.compute execution as per StackOverflow question. --- python/analyzer.py | 62 +++++++++++++++++++++---- python/boost_hist.py | 69 +++++++++++++-------------- python/l1THistos.py | 105 +++++++++++++++++++++++++----------------- python/tree_reader.py | 43 ++++++++++------- 4 files changed, 177 insertions(+), 102 deletions(-) diff --git a/python/analyzer.py b/python/analyzer.py index 906d11a..7c8bd81 100644 --- a/python/analyzer.py +++ b/python/analyzer.py @@ -1,3 +1,4 @@ +from calendar import c import os import sys import traceback @@ -10,17 +11,28 @@ import python.l1THistos as Histos import python.tree_reader as treereader from python import collections, timecounter +import dask +import random +import time +from dask.distributed import Client, progress # @profile analyze_counter = 1 +import threading def analyze(params, batch_idx=-1): params.print() debug = int(params.debug) - input_files = [] range_ev = (0, params.maxEvents) + + # ------------------------- PRINT KEYS ------------------------------ + + for key, value in params.items(): + print("KEY:", key , " VALUE: ", value) + + # ------------------------- READ FILES ------------------------------ if params.events_per_job == -1: pprint('This is interactive processing...') @@ -47,22 +59,25 @@ def analyze(params, batch_idx=-1): pprint('') files_with_protocol = [fm.get_eos_protocol(file_name) + file_name for file_name in input_files] + + #client = Client(threads_per_worker=4, n_workers=10) + + # -------------------------CALIBRATIONS ------------------------------ calib_manager = calibs.CalibManager() calib_manager.set_calibration_version(params.calib_version) if params.rate_pt_wps: calib_manager.set_pt_wps_version(params.rate_pt_wps) + + # -------------------------BOOK HISTOS------------------------------ output = up.recreate(params.output_filename) hm = Histos.HistoManager() hm.file = output - - # instantiate all the plotters + plotter_collection = [] plotter_collection.extend(params.plotters) - - # -------------------------BOOK HISTOS------------------------------ - + for plotter in plotter_collection: plotter.book_histos() @@ -71,15 +86,20 @@ def analyze(params, batch_idx=-1): if params.weight_file is not None: collection_manager.read_weight_file(params.weight_file) - # -------------------------EVENT LOOP-------------------------------- + # ------------------------- READ .ROOT FILES -------------------------------- - tree_reader = treereader.TreeReader(range_ev, params.maxEvents) + start_time_reading_files = time.time() pprint('') pprint(f"{'events_per_job':<15}: {params.events_per_job}") pprint(f"{'maxEvents':<15}: {params.maxEvents}") pprint(f"{'range_ev':<15}: {range_ev}") pprint('') + print("Creating dask-delayed objects for file reading...") + print("Reading .ROOT files sequentially...") + + tree_reader = treereader.TreeReader(range_ev, params.maxEvents) + for tree_file_name in files_with_protocol: tree_file = up.open(tree_file_name, num_workers=1) pprint(f'opening file: {tree_file_name}') @@ -117,8 +137,30 @@ def analyze(params, batch_idx=-1): tree_file.close() - pprint(f'Writing histos to file {params.output_filename}') + finish_time_reading_files = time.time() + print("Finished reading .ROOT files in sequentially! Took: ", finish_time_reading_files - start_time_reading_files, "s.") + + # ------------------------- COMPUTE HISTOGRAMS -------------------------------- + pprint(f'Computing histograms... {params.output_filename}') + + start_time_computing = time.time() + + all_histos = hm.computeHistos() + + finish_time_computing = time.time() + pprint(f'Computing histograms is FINISHED. Took: {finish_time_computing - start_time_computing} s.') + + # ------------------------- WRITE HISTOGRAMS -------------------------------- + pprint(f'Writing histos to file {params.output_filename}...') + + start_time_writing = time.time() + hm.writeHistos() output.close() + + finish_time_writing= time.time() + print("Writing histos to file is FINISHED. Took: ", finish_time_writing - start_time_writing, " s.") + + # ------------------------- TOTAL ENTRIES OUTPUT -------------------------------- - return tree_reader.n_tot_entries + return tree_reader.n_tot_entries \ No newline at end of file diff --git a/python/boost_hist.py b/python/boost_hist.py index a570535..a68796d 100644 --- a/python/boost_hist.py +++ b/python/boost_hist.py @@ -1,7 +1,22 @@ -import awkward as ak import hist from hist import Hist +import awkward as ak +import dask + +@dask.delayed +def all_histogram_actions_TH1F(params_TH1F, params_TH1F_fill): + # Unpack data from two lists + name, title, nbins, bin_low, bin_high = params_TH1F + array, weights = params_TH1F_fill + + # Create a histogram + histogram = TH1F(name, title, nbins, bin_low, bin_high) + + # Fill it + #print("boost_hist.py: all_histogram_actions_TH1F: ", title, " DATA: ", array) + histogram = fill_1Dhist(histogram, array, weights) + return histogram def TH1F(name, title, nbins, bin_low, bin_high): b_axis_name = 'X' @@ -10,12 +25,13 @@ def TH1F(name, title, nbins, bin_low, bin_high): b_axis_name = title_split[1] b_name = title_split[0] b_label = name + return Hist( - hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name), - label=b_label, - name=b_name, - storage=hist.storage.Weight() - ) + hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name), + label=b_label, + name=b_name, + storage=hist.storage.Weight() + ) def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_high): b_x_axis_name = 'X' @@ -27,51 +43,36 @@ def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_ b_y_axis_name = title_split[2] b_name = title_split[0] b_label = name + return Hist( - hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name), - hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), - label=b_label, + hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name), + hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), + label=b_label, name=b_name, storage=hist.storage.Weight() - ) - + ) -def TH2F_category(name, title, x_categories, y_nbins, y_bin_low, y_bin_high): - b_x_axis_name = 'X' - b_y_axis_name = 'Y' - title_split = title.split(';') - if len(title_split) > 1: - b_x_axis_name = title_split[1] - if len(title_split) > 2: - b_y_axis_name = title_split[2] - b_name = title_split[0] - b_label = name - return Hist( - hist.axis.StrCategory(x_categories, name=b_x_axis_name), - hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name), - label=b_label, - name=b_name, - storage=hist.storage.Weight() - ) - - -def fill_1Dhist(hist, array, weights=None): +def fill_1Dhist(hist, array, weights=None): flar = ak.drop_none(ak.flatten(array)) + if weights is None: hist.fill(flar, threads=None) # ROOT.fill_1Dhist(hist=hist, array=flar) else: hist.fill(flar, weights) # ROOT.fill_1Dhist(hist=hist, array=flar, weights=weights) - + + return hist + def fill_2Dhist(hist, arrayX, arrayY, weights=None): flar_x = ak.drop_none(ak.flatten(arrayX)) flar_y = ak.drop_none(ak.flatten(arrayY)) - + if weights is None: # ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y) hist.fill(flar_x, flar_y, threads=None) else: # ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y, weights=weights) hist.fill(flar_x, flar_y, weights) - + + return hist \ No newline at end of file diff --git a/python/l1THistos.py b/python/l1THistos.py index a1611c9..6b59be2 100644 --- a/python/l1THistos.py +++ b/python/l1THistos.py @@ -1,5 +1,4 @@ from array import array - import awkward as ak import hist @@ -10,6 +9,8 @@ # import pandas as pd import uproot as up from scipy.special import expit +import dask +import inspect import python.boost_hist as bh @@ -21,22 +22,41 @@ def __init__(self): self.val = None self.histoList = list() self.file = None + self.computed_histos = [[], []] def __str__(self): return f'self{self.val}' - - def addHistos(self, histo): # print 'ADD histo: {}'.format(histo) self.histoList.append(histo) - def writeHistos(self): + def computeHistos(self): + # [0] full_path = directory + "/" + histo name + # [1] histos = all histos + all_histos = [[], []] for histo in self.histoList: - histo.write(self.file) + returned_hists_obj = histo.get_all_histograms() + for full_path, histo in returned_hists_obj: + all_histos[0].append(full_path) + all_histos[1].append(histo) + + self.computed_histos = dask.compute(all_histos) + return self.computed_histos - instance = None + def writeHistos(self): + for path_list, histo_list in self.computed_histos: + histos_len = len(histo_list) + + for index in range(0, histos_len): + self.write_single_histogram(path_list[index], histo_list[index]) + def write_single_histogram(self, path, histogram): + up_writeable_hist = up.to_writable(histogram) + self.file[path] = up_writeable_hist + + instance = None + def __new__(cls): if not HistoManager.instance: HistoManager.instance = HistoManager.__TheManager() @@ -79,22 +99,18 @@ def __init__(self, name, root_file=None, debug=False): hm = HistoManager() hm.addHistos(self) - def write(self, upfile): + def get_all_histograms(self): dir_name = self.__class__.__name__ + histos_in_write = [] for histo in [a for a in dir(self) if a.startswith('h_')]: writeable_hist = getattr(self, histo) - # print (f"Writing {histo} class {writeable_hist.__class__.__name__}") - if 'GraphBuilder' in writeable_hist.__class__.__name__ : - continue - elif 'TH1' in writeable_hist.__class__.__name__ or 'TH2' in writeable_hist.__class__.__name__: - # print('start') - # FIXME: this somehow fails randomply. ROOT not lining the right python??? - upfile[f'{dir_name}/{writeable_hist.GetName()}'] = writeable_hist - # print('ok') - else: - up_writeable_hist = up.to_writable(writeable_hist) - upfile[f'{dir_name}/{writeable_hist.label}'] = up_writeable_hist - + name = writeable_hist.label + + full_path = dir_name + '/' + name + + histos_in_write.append((full_path, writeable_hist)) + return histos_in_write + # def normalize(self, norm): # className = self.__class__.__name__ # ret = className() @@ -579,49 +595,56 @@ def fill(self, cl3ds): class EGHistos(BaseHistos): def __init__(self, name, root_file=None, debug=False): - if not root_file: - self.h_pt = bh.TH1F(f'{name}_pt', 'EG Pt (GeV); p_{T} [GeV]', 100, 0, 100) - self.h_eta = bh.TH1F(f'{name}_eta', 'EG eta; #eta;', 100, -4, 4) - self.h_energy = bh.TH1F(f'{name}_energy', 'EG energy (GeV); E [GeV]', 1000, 0, 1000) - self.h_hwQual = bh.TH1F(f'{name}_hwQual', 'EG energy (GeV); hwQual', 5, 0, 5) - self.h_tkIso = bh.TH1F(f'{name}_tkIso', 'Iso; rel-iso_{tk}', 100, 0, 2) - self.h_pfIso = bh.TH1F(f'{name}_pfIso', 'Iso; rel-iso_{pf}', 100, 0, 2) - self.h_tkIsoPV = bh.TH1F(f'{name}_tkIsoPV', 'Iso; rel-iso^{PV}_{tk}', 100, 0, 2) - self.h_pfIsoPV = bh.TH1F(f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2) - self.h_n = bh.TH1F(f'{name}_n', '# objects per event', 100, 0, 100) - self.h_compBdt = bh.TH1F(f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1) + #print("INIT EG HISTOS") + self.name = name BaseHistos.__init__(self, name, root_file, debug) def fill(self, egs): + #print("l1Histos.py: FILLING EG HISTOS... ") weight = None if 'weight' in egs.fields: weight = egs.weight - bh.fill_1Dhist(hist=self.h_pt, array=egs.pt, weights=weight) - bh.fill_1Dhist(hist=self.h_eta, array=egs.eta, weights=weight) + #data = egs.pt.compute() + #print("l1Histos.py: bh is", type(bh), " DATA: ", data) + + #tack = inspect.stack() + #print("l2THistos.py: TACK ", tack) + + name = self.name + + #self.h_energy = all_histogram_actions_TH1F([f'{name}_energy', 'EG energy (GeV); E [GeV]', 1000, 0, 1000], [egs.energy, weight]) + + #bh.fill_1Dhist(hist=self.h_pt, array=egs.pt, weights=weight) + self.h_pt = bh.all_histogram_actions_TH1F([f'{name}_pt', 'EG Pt (GeV); p_{T} [GeV]', 100, 0, 100], [egs.pt, weight]) + #bh.fill_1Dhist(hist=self.h_eta, array=egs.eta, weights=weight) + self.h_eta = bh.all_histogram_actions_TH1F([f'{name}_eta', 'EG eta; #eta;', 100, -4, 4], [egs.eta, weight]) # bh.fill_1Dhist(hist=self.h_energy, array=egs.energy, weights=weight) - bh.fill_1Dhist(hist=self.h_hwQual, array=egs.hwQual, weights=weight) + #bh.fill_1Dhist(hist=self.h_hwQual, array=egs.hwQual, weights=weight) + self.h_hwQual = bh.all_histogram_actions_TH1F([f'{name}_hwQual', 'EG energy (GeV); hwQual', 5, 0, 5], [egs.hwQual, weight]) if 'tkIso' in egs.fields: - bh.fill_1Dhist(hist=self.h_tkIso, array=egs.tkIso, weights=weight) + self.h_tkIso = bh.all_histogram_actions_TH1F([f'{name}_tkIso', 'Iso; rel-iso_{tk}', 100, 0, 2], [egs.tkIso, weight]) if 'pfIso' in egs.fields: - bh.fill_1Dhist(hist=self.h_pfIso, array=egs.pfIso, weights=weight) + self.h_pfIso = bh.all_histogram_actions_TH1F([f'{name}_pfIso', 'Iso; rel-iso_{pf}', 100, 0, 2], [egs.pfIso, weight]) if 'tkIsoPV' in egs.fields: - bh.fill_1Dhist(hist=self.h_tkIsoPV, array=egs.tkIsoPV, weights=weight) - bh.fill_1Dhist(hist=self.h_pfIsoPV, array=egs.pfIsoPV, weights=weight) + self.h_tkIsoPV = bh.all_histogram_actions_TH1F([f'{name}_tkIsoPV', 'Iso; rel-iso^{PV}_{tk}', 100, 0, 2], [egs.tkIsoPV, weight]) + self.h_pfIsoPV = bh.all_histogram_actions_TH1F([f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2], [egs.pfIsoPV, weight]) if 'compBDTScore' in egs.fields: - bh.fill_1Dhist(hist=self.h_compBdt, array=egs.compBDTScore, weights=weight) + self.h_compBdt = bh.all_histogram_actions_TH1F([f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1], [egs.compBDTScore, weight]) if 'idScore' in egs.fields: - bh.fill_1Dhist(hist=self.h_compBdt, array=expit(egs.idScore), weights=weight) + self.h_compBdt = bh.all_histogram_actions_TH1F([f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1], [expit(egs.idScore), weight]) + # print(ak.count(egs.pt, axis=1)) # print(egs.pt.type.show()) # print(ak.count(egs.pt, axis=1).type.show()) - self.h_n.fill(ak.count(egs.pt, axis=1)) + #self.h_n.fill(ak.count(egs.pt, axis=1)) # bh.fill_1Dhist(hist=self.h_n, array=ak.count(egs.pt, axis=1), weights=weight) # self.h_n.Fill() - + #self.h_n = bh.all_histogram_actions_TH1F([f'{name}_n', '# objects per event', 100, 0, 100], [ak.count(egs.pt, axis=1), weight]) def add_histos(self): + # PAKEITIMAI PAKEITIMAI KETURIOS EILUTES APACOJE self.h_pt.Add(self.h_pt_temp.GetValue()) self.h_eta.Add(self.h_eta_temp.GetValue()) self.h_energy.Add(self.h_energy_temp.GetValue()) diff --git a/python/tree_reader.py b/python/tree_reader.py index 871b3ed..2cc7730 100644 --- a/python/tree_reader.py +++ b/python/tree_reader.py @@ -1,10 +1,14 @@ import datetime import gc +import profile import resource import awkward as ak import vector +import coffea +from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, BaseSchema + vector.register_awkward() class TreeReader: @@ -88,13 +92,12 @@ def dump_garbage(self): if len(s) > 80: s = s[:80] print (type(x),'\n ', s) - def getDataFrame(self, prefix, entry_block, fallback=None): branches = [br for br in self._branches if br.startswith(f'{prefix}_') and br != f'{prefix}_n'] names = ['_'.join(br.split('_')[1:]) for br in branches] - name_map = dict(zip(names, branches, strict=False)) + name_map = dict(zip(names, branches)) if len(branches) == 0: if fallback is not None: return self.getDataFrame(prefix=fallback, entry_block=entry_block) @@ -102,21 +105,28 @@ def getDataFrame(self, prefix, entry_block, fallback=None): print(f'stored branch prefixes are: {prefs}') raise ValueError(f'[TreeReader::getDataFrame] No branches with prefix: {prefix}') - akarray = self.tree.arrays(names, - library='ak', - aliases=name_map, - entry_start=self.file_entry, - entry_stop=self.file_entry+entry_block) + dask_akarray = NanoEventsFactory.from_root( + self.tree, + schemaclass=NanoAODSchema).events() - # print(akarray) - records = {} - for field in akarray.fields: - records[field] = akarray[field] + #akarray = self.tree.arrays(names, library='ak', aliases=name_map, entry_start=self.file_entry, entry_stop=self.file_entry+entry_block) + #print("[0] prefix to select: ", prefix) + + print("ACTIVATING TREE READER... ") + + dask_akarray = dask_akarray[prefix] + #print("[1] Selected fields from prefix", dask_akarray.fields) - if 'pt' in names and 'eta' in names and 'phi' in names: - if 'mass' not in names and 'energy' not in names: - records['mass'] = 0.*akarray['pt'] - return vector.zip(records) + dask_akarray = dask_akarray[names] + + #print("[2] specific fields with names", dask_akarray.fields) + dask_akarray = dask_akarray[self.file_entry : self.file_entry + entry_block] + + dask_akarray = dask_akarray.persist() + + records = {} + for field in dask_akarray.fields: + records[field] = dask_akarray[field] return ak.zip(records) @@ -124,5 +134,4 @@ def getDataFrame(self, prefix, entry_block, fallback=None): # ele_rec = ak.zip({'pt': tkele.pt, 'eta': tkele.eta, 'phi': tkele.phi}, with_name="pippo") # this would allow to handle the records and assign behaviours.... - # return akarray - + # return akarray \ No newline at end of file From c8951fe20c841468891fb80eda3cb60506675754 Mon Sep 17 00:00:00 2001 From: GintasS Date: Wed, 24 Apr 2024 17:03:15 +0200 Subject: [PATCH 3/6] Removed unused comment. --- python/l1THistos.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/l1THistos.py b/python/l1THistos.py index 6b59be2..f626c29 100644 --- a/python/l1THistos.py +++ b/python/l1THistos.py @@ -644,7 +644,6 @@ def fill(self, egs): #self.h_n = bh.all_histogram_actions_TH1F([f'{name}_n', '# objects per event', 100, 0, 100], [ak.count(egs.pt, axis=1), weight]) def add_histos(self): - # PAKEITIMAI PAKEITIMAI KETURIOS EILUTES APACOJE self.h_pt.Add(self.h_pt_temp.GetValue()) self.h_eta.Add(self.h_eta_temp.GetValue()) self.h_energy.Add(self.h_energy_temp.GetValue()) From 1f564dd371e1d3d8470147123617b76680401c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gintautas=20=C5=A0vedas?= Date: Mon, 29 Apr 2024 13:11:33 +0200 Subject: [PATCH 4/6] delaying other histogram methods --- python/boost_hist.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/boost_hist.py b/python/boost_hist.py index a68796d..d681228 100644 --- a/python/boost_hist.py +++ b/python/boost_hist.py @@ -15,9 +15,9 @@ def all_histogram_actions_TH1F(params_TH1F, params_TH1F_fill): # Fill it #print("boost_hist.py: all_histogram_actions_TH1F: ", title, " DATA: ", array) - histogram = fill_1Dhist(histogram, array, weights) return histogram +@dask.delayed def TH1F(name, title, nbins, bin_low, bin_high): b_axis_name = 'X' title_split = title.split(';') @@ -52,6 +52,7 @@ def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_ storage=hist.storage.Weight() ) +@dask.delayed def fill_1Dhist(hist, array, weights=None): flar = ak.drop_none(ak.flatten(array)) @@ -75,4 +76,4 @@ def fill_2Dhist(hist, arrayX, arrayY, weights=None): # ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y, weights=weights) hist.fill(flar_x, flar_y, weights) - return hist \ No newline at end of file + return hist From 9781a9d885522da11fe98b4c290d629abc1a69a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gintautas=20=C5=A0vedas?= Date: Mon, 29 Apr 2024 13:12:41 +0200 Subject: [PATCH 5/6] delaying continuous fill --- python/l1THistos.py | 53 +++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/python/l1THistos.py b/python/l1THistos.py index f626c29..68dc162 100644 --- a/python/l1THistos.py +++ b/python/l1THistos.py @@ -595,8 +595,17 @@ def fill(self, cl3ds): class EGHistos(BaseHistos): def __init__(self, name, root_file=None, debug=False): - #print("INIT EG HISTOS") - self.name = name + if not root_file: + self.h_pt = bh.TH1F(f'{name}_pt', 'EG Pt (GeV); p_{T} [GeV]', 100, 0, 100) + self.h_eta = bh.TH1F(f'{name}_eta', 'EG eta; #eta;', 100, -4, 4) + self.h_energy = bh.TH1F(f'{name}_energy', 'EG energy (GeV); E [GeV]', 1000, 0, 1000) + self.h_hwQual = bh.TH1F(f'{name}_hwQual', 'EG energy (GeV); hwQual', 5, 0, 5) + self.h_tkIso = bh.TH1F(f'{name}_tkIso', 'Iso; rel-iso_{tk}', 100, 0, 2) + self.h_pfIso = bh.TH1F(f'{name}_pfIso', 'Iso; rel-iso_{pf}', 100, 0, 2) + self.h_tkIsoPV = bh.TH1F(f'{name}_tkIsoPV', 'Iso; rel-iso^{PV}_{tk}', 100, 0, 2) + self.h_pfIsoPV = bh.TH1F(f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2) + self.h_n = bh.TH1F(f'{name}_n', '# objects per event', 100, 0, 100) + self.h_compBdt = bh.TH1F(f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1) BaseHistos.__init__(self, name, root_file, debug) @@ -605,44 +614,27 @@ def fill(self, egs): weight = None if 'weight' in egs.fields: weight = egs.weight - - #data = egs.pt.compute() - #print("l1Histos.py: bh is", type(bh), " DATA: ", data) - - #tack = inspect.stack() - #print("l2THistos.py: TACK ", tack) - - name = self.name - - #self.h_energy = all_histogram_actions_TH1F([f'{name}_energy', 'EG energy (GeV); E [GeV]', 1000, 0, 1000], [egs.energy, weight]) + + self.h_pt = bh.fill_1Dhist(hist=self.h_pt, array=egs.pt, weights=weight) - #bh.fill_1Dhist(hist=self.h_pt, array=egs.pt, weights=weight) - self.h_pt = bh.all_histogram_actions_TH1F([f'{name}_pt', 'EG Pt (GeV); p_{T} [GeV]', 100, 0, 100], [egs.pt, weight]) - #bh.fill_1Dhist(hist=self.h_eta, array=egs.eta, weights=weight) - self.h_eta = bh.all_histogram_actions_TH1F([f'{name}_eta', 'EG eta; #eta;', 100, -4, 4], [egs.eta, weight]) + self.h_eta = bh.fill_1Dhist(hist=self.h_eta, array=egs.eta, weights=weight) # bh.fill_1Dhist(hist=self.h_energy, array=egs.energy, weights=weight) - #bh.fill_1Dhist(hist=self.h_hwQual, array=egs.hwQual, weights=weight) - self.h_hwQual = bh.all_histogram_actions_TH1F([f'{name}_hwQual', 'EG energy (GeV); hwQual', 5, 0, 5], [egs.hwQual, weight]) + self.h_hwQual = bh.fill_1Dhist(hist=self.h_hwQual, array=egs.hwQual, weights=weight) if 'tkIso' in egs.fields: - self.h_tkIso = bh.all_histogram_actions_TH1F([f'{name}_tkIso', 'Iso; rel-iso_{tk}', 100, 0, 2], [egs.tkIso, weight]) + self.h_tkIso = bh.fill_1Dhist(hist=self.h_tkIso, array=egs.tkIso, weights=weight) if 'pfIso' in egs.fields: - self.h_pfIso = bh.all_histogram_actions_TH1F([f'{name}_pfIso', 'Iso; rel-iso_{pf}', 100, 0, 2], [egs.pfIso, weight]) + self.h_pfIso = bh.fill_1Dhist(hist=self.h_pfIso, array=egs.pfIso, weights=weight) if 'tkIsoPV' in egs.fields: - self.h_tkIsoPV = bh.all_histogram_actions_TH1F([f'{name}_tkIsoPV', 'Iso; rel-iso^{PV}_{tk}', 100, 0, 2], [egs.tkIsoPV, weight]) - self.h_pfIsoPV = bh.all_histogram_actions_TH1F([f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2], [egs.pfIsoPV, weight]) + self.h_tkIsoPV = bh.fill_1Dhist(hist=self.h_tkIsoPV, array=egs.tkIsoPV, weights=weight) + self.h_pfIsoPV = bh.fill_1Dhist(hist=self.h_pfIsoPV, array=egs.pfIsoPV, weights=weight) if 'compBDTScore' in egs.fields: - self.h_compBdt = bh.all_histogram_actions_TH1F([f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1], [egs.compBDTScore, weight]) + self.h_compBdt = bh.fill_1Dhist(hist=self.h_compBdt, array=egs.compBDTScore, weights=weight) if 'idScore' in egs.fields: - self.h_compBdt = bh.all_histogram_actions_TH1F([f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1], [expit(egs.idScore), weight]) + self.h_compBdt = bh.fill_1Dhist(hist=self.h_compBdt, array=expit(egs.idScore), weights=weight) - # print(ak.count(egs.pt, axis=1)) - # print(egs.pt.type.show()) - # print(ak.count(egs.pt, axis=1).type.show()) - #self.h_n.fill(ak.count(egs.pt, axis=1)) - # bh.fill_1Dhist(hist=self.h_n, array=ak.count(egs.pt, axis=1), weights=weight) - # self.h_n.Fill() #self.h_n = bh.all_histogram_actions_TH1F([f'{name}_n', '# objects per event', 100, 0, 100], [ak.count(egs.pt, axis=1), weight]) + def add_histos(self): self.h_pt.Add(self.h_pt_temp.GetValue()) self.h_eta.Add(self.h_eta_temp.GetValue()) @@ -653,7 +645,6 @@ def add_histos(self): # self.h_tkIsoPV = bh.TH1F(name+'_tkIsoPV', 'Iso; rel-iso^{PV}_{tk}', 100, 0, 2) # self.h_pfIsoPV = bh.TH1F(name+'_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2) - class DecTkHistos(BaseHistos): def __init__(self, name, root_file=None, debug=False): if not root_file: From 93eb55c490132f1eeb5dc884c7fdf5746c31e760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gintautas=20=C5=A0vedas?= Date: Mon, 29 Apr 2024 14:11:55 +0200 Subject: [PATCH 6/6] added parallelized reading of files --- python/analyzer.py | 84 ++++++++++++++++++++-------------------------- 1 file changed, 36 insertions(+), 48 deletions(-) diff --git a/python/analyzer.py b/python/analyzer.py index 7c8bd81..891669a 100644 --- a/python/analyzer.py +++ b/python/analyzer.py @@ -89,53 +89,15 @@ def analyze(params, batch_idx=-1): # ------------------------- READ .ROOT FILES -------------------------------- start_time_reading_files = time.time() - pprint('') - pprint(f"{'events_per_job':<15}: {params.events_per_job}") - pprint(f"{'maxEvents':<15}: {params.maxEvents}") - pprint(f"{'range_ev':<15}: {range_ev}") - pprint('') - - print("Creating dask-delayed objects for file reading...") - print("Reading .ROOT files sequentially...") - - tree_reader = treereader.TreeReader(range_ev, params.maxEvents) - - for tree_file_name in files_with_protocol: - tree_file = up.open(tree_file_name, num_workers=1) - pprint(f'opening file: {tree_file_name}') - pprint(f' . tree name: {params.tree_name}') - - ttree = tree_file[params.tree_name] - - tree_reader.setTree(ttree) - - while tree_reader.next(debug): - try: - collection_manager.read(tree_reader, debug) - - for plotter in plotter_collection: - plotter.fill_histos_event(tree_reader.file_entry, debug=debug) - - if ( - batch_idx != -1 - and timecounter.counter.started() - and tree_reader.global_entry % 100 == 0 - and timecounter.counter.job_flavor_time_left(params.htc_jobflavor) < 5 * 60 - ): - tree_reader.printEntry() - pprint(' less than 5 min left for batch slot: exit event loop!') - timecounter.counter.job_flavor_time_perc(params.htc_jobflavor) - break - - except Exception as inst: - tree_reader.printEntry() - pprint(f'[EXCEPTION OCCURRED:] {inst!s}') - pprint('Unexpected error:', sys.exc_info()[0]) - traceback.print_exc() - tree_file.close() - sys.exit(200) - - tree_file.close() + + compute_files = [] + for file in files_with_protocol: + single_tree_reader = treereader.TreeReader(range_ev, params.maxEvents) + compute_files.append(process_file(file, single_tree_reader, debug, collection_manager, plotter_collection, hm)) + + results = dask.compute(*compute_files) + + n_tot_entries = sum(results) finish_time_reading_files = time.time() print("Finished reading .ROOT files in sequentially! Took: ", finish_time_reading_files - start_time_reading_files, "s.") @@ -163,4 +125,30 @@ def analyze(params, batch_idx=-1): # ------------------------- TOTAL ENTRIES OUTPUT -------------------------------- - return tree_reader.n_tot_entries \ No newline at end of file + return n_tot_entries + +@dask.delayed +def process_file(filename, tree_reader, debug, collection_manager, plotter_collection, hm): + print("ACTIVE PROCESS_FILE") + + tree_file = up.open(filename, num_workers=1) + pprint(f'opening file: {filename}') + + ttree = tree_file['Events'] + tree_reader.setTree(ttree) + + while tree_reader.next(debug): + try: + collection_manager.read(tree_reader, debug) + + for plotter in plotter_collection: + #print("DOING PLOTTER", plotter) + plotter.fill_histos_event(tree_reader.file_entry, debug=debug) + + except Exception as inst: + pprint(f'[EXCEPTION OCCURRED:] {inst!s}') + pprint('Unexpected error:', sys.exc_info()[0]) + + tree_file.close() + + return tree_reader.n_tot_entries