Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastpuppi v1 dask integration #8

Open
wants to merge 10 commits into
base: fastpuppi_v1_dask
Choose a base branch
from
109 changes: 69 additions & 40 deletions python/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from calendar import c
import os
import sys
import traceback
Expand All @@ -10,17 +11,28 @@
import python.l1THistos as Histos
import python.tree_reader as treereader
from python import collections, timecounter
import dask
import random
import time
from dask.distributed import Client, progress

# @profile
analyze_counter = 1

import threading

def analyze(params, batch_idx=-1):
params.print()
debug = int(params.debug)

input_files = []
range_ev = (0, params.maxEvents)

# ------------------------- PRINT KEYS ------------------------------

for key, value in params.items():
print("KEY:", key , " VALUE: ", value)

# ------------------------- READ FILES ------------------------------

if params.events_per_job == -1:
pprint('This is interactive processing...')
Expand All @@ -47,22 +59,25 @@ def analyze(params, batch_idx=-1):
pprint('')

files_with_protocol = [fm.get_eos_protocol(file_name) + file_name for file_name in input_files]

client = Client(threads_per_worker=4, n_workers=1)

# -------------------------CALIBRATIONS ------------------------------

calib_manager = calibs.CalibManager()
calib_manager.set_calibration_version(params.calib_version)
if params.rate_pt_wps:
calib_manager.set_pt_wps_version(params.rate_pt_wps)

# -------------------------BOOK HISTOS------------------------------

output = up.recreate(params.output_filename)
hm = Histos.HistoManager()
hm.file = output

# instantiate all the plotters

plotter_collection = []
plotter_collection.extend(params.plotters)

# -------------------------BOOK HISTOS------------------------------


for plotter in plotter_collection:
plotter.book_histos()

Expand All @@ -73,52 +88,66 @@ def analyze(params, batch_idx=-1):

# -------------------------EVENT LOOP--------------------------------

tree_reader = treereader.TreeReader(range_ev, params.maxEvents)
pprint('')
pprint(f"{'events_per_job':<15}: {params.events_per_job}")
pprint(f"{'maxEvents':<15}: {params.maxEvents}")
pprint(f"{'range_ev':<15}: {range_ev}")
pprint('')

for tree_file_name in files_with_protocol:
tree_file = up.open(tree_file_name, num_workers=1)
pprint(f'opening file: {tree_file_name}')
pprint(f' . tree name: {params.tree_name}')
print("Creating dask-delayed objects for file reading...")
files_dask_delayed = []
tree_reader_instances = []
for file in files_with_protocol:
single_tree_reader = treereader.TreeReader(range_ev, params.maxEvents)
single_file_dd = dask.delayed(process_file(file, params, single_tree_reader, debug, collection_manager, plotter_collection, hm))
files_dask_delayed.append(single_file_dd)
tree_reader_instances.append(single_tree_reader)

start_time_reading_files = time.time()
print("Reading .ROOT files in parallel...")
dask.compute(files_dask_delayed)
finish_time_reading_files = time.time()
print("Finished reading .ROOT files in parallel! Took: ", finish_time_reading_files - start_time_reading_files, " s.")

ttree = tree_file[params.tree_name]
# ------------------------- WRITING HISTOGRAMS --------------------------------

tree_reader.setTree(ttree)
pprint(f'Writing histos to file {params.output_filename}')
start_time_writing_histograms = time.time()

while tree_reader.next(debug):
try:
collection_manager.read(tree_reader, debug)
hm.writeHistos()
output.close()

finish_time_writing_histograms = time.time()
print("Writing histos to file FINISHED! Took: ", finish_time_writing_histograms - start_time_writing_histograms, " s.")

for plotter in plotter_collection:
plotter.fill_histos_event(tree_reader.file_entry, debug=debug)
# ------------------------- TOTAL ENTRIES OUTPUT --------------------------------

if (
batch_idx != -1
and timecounter.counter.started()
and tree_reader.global_entry % 100 == 0
and timecounter.counter.job_flavor_time_left(params.htc_jobflavor) < 5 * 60
):
tree_reader.printEntry()
pprint(' less than 5 min left for batch slot: exit event loop!')
timecounter.counter.job_flavor_time_perc(params.htc_jobflavor)
break
total_entries = 0
for tree_reader_instance in tree_reader_instances:
total_entries += tree_reader_instance.n_tot_entries

except Exception as inst:
tree_reader.printEntry()
pprint(f'[EXCEPTION OCCURRED:] {inst!s}')
pprint('Unexpected error:', sys.exc_info()[0])
traceback.print_exc()
tree_file.close()
sys.exit(200)
return total_entries

tree_file.close()
def process_file(file_name, params, tree_reader, debug, collection_manager, plotter_collection, hm):
tree_file = up.open(file_name, num_workers=1)
pprint(f'[process_file] opening file: {file_name}')
pprint(f'[process_file] . tree name: {params.tree_name}')

pprint(f'Writing histos to file {params.output_filename}')
hm.writeHistos()
output.close()
ttree = tree_file[params.tree_name]
tree_reader.setTree(ttree)

while tree_reader.next(debug):
try:
collection_manager.read(tree_reader, debug)

for plotter in plotter_collection:
plotter.fill_histos_event(tree_reader.file_entry, debug=debug)

except Exception as inst:
pprint(f'[EXCEPTION OCCURRED:] {inst!s}')
pprint('Unexpected error:', sys.exc_info()[0])
return 1

return tree_reader.n_tot_entries
tree_file.close()

return 0
55 changes: 18 additions & 37 deletions python/boost_hist.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import awkward as ak
import hist.dask as dah
import hist
from hist import Hist

import awkward as ak

def TH1F(name, title, nbins, bin_low, bin_high):
b_axis_name = 'X'
Expand All @@ -10,12 +9,13 @@ def TH1F(name, title, nbins, bin_low, bin_high):
b_axis_name = title_split[1]
b_name = title_split[0]
b_label = name
return Hist(
hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name),
label=b_label,
name=b_name,
storage=hist.storage.Weight()
)

return hist.dask.Hist(
hist.axis.Regular(bins=nbins, start=bin_low, stop=bin_high, name=b_axis_name),
label=b_label,
name=b_name,
storage=hist.storage.Weight()
)

def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_high):
b_x_axis_name = 'X'
Expand All @@ -27,51 +27,32 @@ def TH2F(name, title, x_nbins, x_bin_low, x_bin_high, y_nbins, y_bin_low, y_bin_
b_y_axis_name = title_split[2]
b_name = title_split[0]
b_label = name
return Hist(
hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name),
hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name),
label=b_label,
name=b_name,
storage=hist.storage.Weight()
)


def TH2F_category(name, title, x_categories, y_nbins, y_bin_low, y_bin_high):
GintasS marked this conversation as resolved.
Show resolved Hide resolved
b_x_axis_name = 'X'
b_y_axis_name = 'Y'
title_split = title.split(';')
if len(title_split) > 1:
b_x_axis_name = title_split[1]
if len(title_split) > 2:
b_y_axis_name = title_split[2]
b_name = title_split[0]
b_label = name
return Hist(
hist.axis.StrCategory(x_categories, name=b_x_axis_name),
hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name),
label=b_label,

return hist.dask.Hist(
hist.axis.Regular(bins=x_nbins, start=x_bin_low, stop=x_bin_high, name=b_x_axis_name),
hist.axis.Regular(bins=y_nbins, start=y_bin_low, stop=y_bin_high, name=b_y_axis_name),
label=b_label,
name=b_name,
storage=hist.storage.Weight()
)


def fill_1Dhist(hist, array, weights=None):
flar = ak.drop_none(ak.flatten(array))

if weights is None:
hist.fill(flar, threads=None)
# ROOT.fill_1Dhist(hist=hist, array=flar)
else:
hist.fill(flar, weights)
# ROOT.fill_1Dhist(hist=hist, array=flar, weights=weights)

GintasS marked this conversation as resolved.
Show resolved Hide resolved
def fill_2Dhist(hist, arrayX, arrayY, weights=None):
flar_x = ak.drop_none(ak.flatten(arrayX))
flar_y = ak.drop_none(ak.flatten(arrayY))

if weights is None:
# ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y)
hist.fill(flar_x, flar_y, threads=None)
else:
# ROOT.fill_2Dhist(hist=hist, arrayX=flar_x, arrayY=flar_y, weights=weights)
hist.fill(flar_x, flar_y, weights)

hist.fill(flar_x, flar_y, weights)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing new-line at the end of file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what do you mean?

7 changes: 5 additions & 2 deletions python/l1THistos.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def write(self, upfile):
for histo in [a for a in dir(self) if a.startswith('h_')]:
writeable_hist = getattr(self, histo)
# print (f"Writing {histo} class {writeable_hist.__class__.__name__}")
name = writeable_hist.label
writeable_hist = writeable_hist.compute()

if 'GraphBuilder' in writeable_hist.__class__.__name__ :
continue
elif 'TH1' in writeable_hist.__class__.__name__ or 'TH2' in writeable_hist.__class__.__name__:
Expand All @@ -93,7 +96,7 @@ def write(self, upfile):
# print('ok')
else:
up_writeable_hist = up.to_writable(writeable_hist)
upfile[f'{dir_name}/{writeable_hist.label}'] = up_writeable_hist
upfile[f'{dir_name}/{name}'] = up_writeable_hist

# def normalize(self, norm):
# className = self.__class__.__name__
Expand Down Expand Up @@ -590,7 +593,7 @@ def __init__(self, name, root_file=None, debug=False):
self.h_pfIsoPV = bh.TH1F(f'{name}_pfIsoPV', 'Iso; rel-iso^{PV}_{pf}', 100, 0, 2)
self.h_n = bh.TH1F(f'{name}_n', '# objects per event', 100, 0, 100)
self.h_compBdt = bh.TH1F(f'{name}_compBdt', 'BDT Score Comp ID', 50, 0, 1)

GintasS marked this conversation as resolved.
Show resolved Hide resolved
BaseHistos.__init__(self, name, root_file, debug)

def fill(self, egs):
Expand Down
3 changes: 2 additions & 1 deletion python/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ def compare_selections(sel1, sel2):

wps = working_points_histomax[version]
labels = ['LE', 'HE']
wls = zip(wps, labels, strict=False)

wls = zip(wps, labels)
# for i,
tphgc_egbdt_sel = []

Expand Down
38 changes: 26 additions & 12 deletions python/tree_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import awkward as ak
import vector

import coffea
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema, BaseSchema

vector.register_awkward()

class TreeReader:
Expand Down Expand Up @@ -94,28 +97,40 @@ def getDataFrame(self, prefix, entry_block, fallback=None):
if br.startswith(f'{prefix}_') and
br != f'{prefix}_n']
names = ['_'.join(br.split('_')[1:]) for br in branches]
name_map = dict(zip(names, branches, strict=False))
name_map = dict(zip(names, branches))
if len(branches) == 0:
if fallback is not None:
return self.getDataFrame(prefix=fallback, entry_block=entry_block)
prefs = set([br.split('_')[0] for br in self._branches])
print(f'stored branch prefixes are: {prefs}')
raise ValueError(f'[TreeReader::getDataFrame] No branches with prefix: {prefix}')

akarray = self.tree.arrays(names,
library='ak',
aliases=name_map,
entry_start=self.file_entry,
entry_stop=self.file_entry+entry_block)
dask_akarray = NanoEventsFactory.from_root(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still up to date? did you try avoiding the loopo over the files opening all of them at once moving the NanoEventsFactory to the analyzer module?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it is up-to-date.

self.tree,
schemaclass=NanoAODSchema).events()

#akarray = self.tree.arrays(names,
#library='ak',
#aliases=name_map,
#entry_start=self.file_entry,
#entry_stop=self.file_entry+entry_block)
#print("[0] prefix to select: ", prefix)

dask_akarray = dask_akarray[prefix]
#print("[1] Selected fields from prefix", dask_akarray.fields)

dask_akarray = dask_akarray[names]

#print("[2] specific fields with names", dask_akarray.fields)
dask_akarray = dask_akarray[self.file_entry : self.file_entry + entry_block]

# print(akarray)
records = {}
for field in akarray.fields:
records[field] = akarray[field]
for field in dask_akarray.fields:
records[field] = dask_akarray[field]

if 'pt' in names and 'eta' in names and 'phi' in names:
if 'mass' not in names and 'energy' not in names:
records['mass'] = 0.*akarray['pt']
records['mass'] = 0.*dask_akarray['pt']
return vector.zip(records)

return ak.zip(records)
Expand All @@ -124,5 +139,4 @@ def getDataFrame(self, prefix, entry_block, fallback=None):
# ele_rec = ak.zip({'pt': tkele.pt, 'eta': tkele.eta, 'phi': tkele.phi}, with_name="pippo")
# this would allow to handle the records and assign behaviours....

# return akarray

# return akarray