From 9f04f108a836b33c6628dcfac697e63f94be5351 Mon Sep 17 00:00:00 2001 From: Benedikt Volkel Date: Mon, 23 May 2022 14:43:26 +0200 Subject: [PATCH] Modularise sim WF * possible to select modules to be simulated using --modules * possible to select detectors to be read out using --readout-detectors * tasks requiring inactive detectors are not added * allows for transport only using specified detectors/passive modules * digi, reco and match introduced as logical steps * consrtuct e.g. a digit task with add_digi_task specify detector, task name constructed automatically * get dependency e.g. with get_digit_need * automatic construction of source strings (e.g. used for vertexing commands or aod producer) this again returns a string only including of sources actually present based on reco and match tasks that have been successfully added * still possible to construct any task explicitly with name, needs etc as before * some tasks adjusted to their minimal dependencies * small adjustments in o2dpg_qc_finalization_workflow.py to have same "_" format of tasknames per timeframe --- MC/bin/o2dpg_qc_finalization_workflow.py | 6 +- MC/bin/o2dpg_sim_workflow.py | 663 ++++++++++++++--------- 2 files changed, 403 insertions(+), 266 deletions(-) diff --git a/MC/bin/o2dpg_qc_finalization_workflow.py b/MC/bin/o2dpg_qc_finalization_workflow.py index 3d9769725..78f82664e 100755 --- a/MC/bin/o2dpg_qc_finalization_workflow.py +++ b/MC/bin/o2dpg_qc_finalization_workflow.py @@ -44,8 +44,8 @@ def include_all_QC_finalization(ntimeframes, standalone, run, productionTag): def add_QC_finalization(taskName, qcConfigPath, needs=None): if standalone == True: needs = [] - elif needs == None: - needs = [taskName + '_local' + str(tf) for tf in range(1, ntimeframes + 1)] + elif needs is None: + needs = [taskName + f'_local_{tf}' for tf in range(1, ntimeframes + 1)] task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000') task['cmd'] = f'o2-qc --config {qcConfigPath} --remote-batch {taskName}.root' + \ @@ -74,7 +74,7 @@ def add_QC_postprocessing(taskName, qcConfigPath, needs, runSpecific, prodSpecif ## The list of remote-batch workflows (reading the merged QC tasks results, applying Checks, uploading them to QCDB) MFTDigitsQCneeds = [] for flp in range(5): - MFTDigitsQCneeds.extend(['mftDigitsQC'+str(flp)+'_local'+str(tf) for tf in range(1, ntimeframes + 1)]) + MFTDigitsQCneeds.extend([f'mftDigitsQC{flp}_local_{tf}' for tf in range(1, ntimeframes + 1)]) add_QC_finalization('mftDigitsQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-digit-0.json', MFTDigitsQCneeds) add_QC_finalization('mftClustersQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-cluster.json') add_QC_finalization('mftAsyncQC', 'json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-async.json') diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index a2893ee90..bae702b26 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -80,6 +80,8 @@ + 'Offset x Number of TimeFrames x OrbitsPerTimeframe (up for further sophistication)', default=0) parser.add_argument('-j',help='number of workers (if applicable)', default=8, type=int) parser.add_argument('-mod',help='Active modules (deprecated)', default='--skipModules ZDC') +parser.add_argument('--modules', nargs="*", help='Active modules', default=['all']) +parser.add_argument('--readout-detectors', nargs="*", help='Active readout detectors', default=['all']) parser.add_argument('--with-ZDC', action='store_true', help='Enable ZDC in workflow') parser.add_argument('-seed',help='random seed number', default=None) parser.add_argument('-o',help='output workflow file', default='workflow.json') @@ -168,11 +170,32 @@ def load_external_config(configfile): print ("** Using generic config **") anchorConfig = create_sim_config(args) +# detectors that are always required +DETECTORS_ALWAYS_ON = ["ITS", "CTP", "FT0", "FV0"] # with this we can tailor the workflow to the presence of # certain detectors -activeDetectors = anchorConfig.get('o2-ctf-reader-workflow-options',{}).get('onlyDet','all') +activeDetectors = anchorConfig.get('o2-ctf-reader-workflow-options',{}).get('onlyDet', None) +# it is a ","-separeted string of detectors when it comes from the anchored configuration, otherwise a list from cmd +activeDetectors = activeDetectors.split(",") if activeDetectors is not None else args.readout_detectors # convert to set/hashmap -activeDetectors = { det:1 for det in activeDetectors.split(",") } +activeDetectors = {det: 1 for det in activeDetectors} +# a list of all enabled modules +activeModules = args.modules +# deactivated modules, for backward compatibility with ZDC. +# ZDC is therefore only added when specified explicitly in args.modules or when args.with_ZDC +MODULES, inactiveModules = ("--skipModules ZDC", ["ZDC"]) if not args.with_ZDC or "ZDC" not in activeModules else ("", []) +if "all" not in activeModules: + activeModules.extend(DETECTORS_ALWAYS_ON) +if "all" not in activeDetectors: + if IS_ANCHORED_RUN: + # Issue a warning for each detector that should be switched on + for dao in DETECTORS_ALWAYS_ON: + if dao not in activeDetectors: + print(f"WARNING: Detector {dao} should always be there. The workflow might crash at some point") + else: + activeDetectors.extend(DETECTORS_ALWAYS_ON) + +print(activeModules) # see if a detector is in list of activeDetectors def isActive(detID): @@ -180,14 +203,13 @@ def isActive(detID): detID == the usual detID string (3 letters) """ if "all" in activeDetectors: - return True + return True if (detID in activeModules or "all" in activeModules) and detID not in inactiveModules else False return detID in activeDetectors def addWhenActive(detID, needslist, appendstring): if isActive(detID): needslist.append(appendstring) - def retrieve_sor(run_number): """ retrieves start of run (sor) @@ -209,6 +231,236 @@ def retrieve_sor(run_number): return int(SOR) +# collect all task names that were successfully added +class TaskLookup: + """ + Lookup of added tasks as well as mapping of task names + to digi, reco and match stages per detector + """ + def __init__(self): + self.added_task_names = [] + self.tfs_dets_stages = [] + + @staticmethod + def adjust_det_stage(det, stage): + """ + Make sure to always use upper(lower) case for detectors(stages) + """ + return det.upper(), stage.lower() + + def add_name(self, name): + """ + Simply add a task by its name + """ + self.added_task_names.append(name) + return True + + def add_det_stage_tf(self, det, stage, tf, name, flag): + """ + Map a task name to detector and stage (per timeframe) + """ + if tf < 1: + print(f"ERROR: Invalid timeframe {tf}") + return False + ind = tf - 1 + det, stage = self.adjust_det_stage(det, stage) + if ind >= len(self.tfs_dets_stages): + for _ in range(len(self.tfs_dets_stages), ind + 1): + self.tfs_dets_stages.append({}) + dets_stages = self.tfs_dets_stages[ind] + if det not in dets_stages: + dets_stages[det] = {} + dets_stages[det][stage] = (flag, name) + return True + + def retrieve_det_stage_tf(self, det, stage, tf, consider_not_added=False): + """ + Retrieve the task name for a given detector, stage and timeframe + Decide if also those should be considered that were not added to the final workflow + """ + if tf < 1: + print(f"ERROR: Invalid timeframe {tf}") + return None + ind = tf - 1 + if ind >= len(self.tfs_dets_stages): + return None + det, stage = self.adjust_det_stage(det, stage) + dets_stages = self.tfs_dets_stages[ind] + + if det == "ANY": + # collect any tasks for specific stage for all detectors present + ret_names = [] + for d, values in dets_stages.items(): + if stage in values and values[stage][0]: + ret_names.append(values[stage][1]) + # early return + return ret_names + + # otherwise, retrieve detector specific task for requested stage + if det not in dets_stages: + return None + if stage not in dets_stages[det]: + return None + return dets_stages[det][stage][1] if (dets_stages[det][stage][0] or consider_not_added) else None + +# Use one global object for lookups +TASK_LOOKUP = TaskLookup() + +def adjust_and_make_sources_string_detectors(dets): + """ + Make a source string a la 'det1,det2,detN' for active detectors + and remove inactive detectors + """ + if isinstance(dets, str): + dets = dets.split(",") + return ",".join([d for d in dets if isActive(d)]) + +def make_sources_string_reco_match(proposed, tf, strict=None): + """ + Make a source string a la 'det1,det1-det2,det5-det6-det6,detN' for reco and match + stages of given detectors. + Take a proposed but overwrite with strict if that is given. + If not strict, remove those sources that cannot be fulfilled. If strict, fail if + at least one source is not present. + """ + if strict: + proposed = strict + sources = [] + for det in proposed.split(","): + stage = "reco" + if "-" in det: + stage = "match" + + if not TASK_LOOKUP.retrieve_det_stage_tf(det, stage, tf): + if strict: + print(f"Cannot create detector source string for {strict}") + sys.exit(1) + continue + sources.append(det) + return ",".join(sources) + +def add_task(workflow, name, tf=-1, needs=None, condition=True, return_success=False, **kwargs): + """ + Final function to a add a task by name. + Can be used as usual by specifying the full name of a task + (making sure it contains some uniqueness if added per timeframe) + """ + def make_return(success, what): + if return_success: + return success, what + return what + + if not needs: + needs = [] + task = createTask(name=name, needs=needs, tf=tf, **kwargs) + + for need in needs: + if not need or need not in TASK_LOOKUP.added_task_names: + return make_return(False, task) + + if not condition: + return make_return(False, task) + + # The task and name are only added if needs and condition are fulfilled + workflow.append(task) + TASK_LOOKUP.add_name(name) + return make_return(True, task) + +def add_det_task(workflow, det, stage, tf, needs, conditions=None, **kwargs): + """ + Add a task for a detector and a given stage (digi, reco, match) + """ + if not conditions: + conditions = [True] * len(det) + dets_on = [d for d, on in zip(det, conditions) if on] + name = "_".join((*det, stage, str(tf))) + # create task and decide if can be successfully added + success, task = add_task(workflow, name, tf, needs, condition=dets_on, return_success=True, **kwargs) + # either way, add to lookup map but set flag accordingly to specify if actually been added to the workflow + # do this for each detector separately. Fo rinstance, we have a common digitisation task that digitises "smaller" detectors all at once + for d, on in zip(det, conditions): + TASK_LOOKUP.add_det_stage_tf(d, stage, tf, name, success and on) + + return task + +def add_digi_task(workflow, det, tf, needs, condition=True, **kwargs): + """ + Wrapper to add digitisation task + """ + if isinstance(det, str): + det = [det] + conditions = [isActive(d.upper()) and condition for d in det] + return add_det_task(workflow, det, "digi", tf, needs, conditions, **kwargs) + +def add_reco_task(workflow, det, tf, needs, condition=True, **kwargs): + """ + Wrapper to add reconstruction task + """ + if isinstance(det, str): + det = [det] + conditions = [isActive(d.upper()) and condition for d in det] + return add_det_task(workflow, det, "reco", tf, needs, conditions, **kwargs) + +def add_match_task(workflow, det, tf, needs, condition=True, **kwargs): + """ + Wrapper to add match task + """ + if isinstance(det, str): + det = [det] + conditions = [] + for d in det: + single_dets = d.split("-") + if not all([isActive(sd.upper()) for sd in d.split("-")]): + conditions.append(False) + continue + conditions.append(condition) + return add_det_task(workflow, det, "match", tf, needs, conditions, **kwargs) + +def add_as_match_task(task, det, tf): + """ + Add a task that has been added also to be a match task + For instance used for TRD reco task which then also serves as a matching task + """ + if task["name"] in TASK_LOOKUP.added_task_names: + TASK_LOOKUP.add_det_stage_tf(det, "match", tf, task["name"], True) + +def stage_task_name(det, stage, tf): + """ + Retrieve a task name based on a stage and detector name + """ + return TASK_LOOKUP.retrieve_det_stage_tf(det, stage, tf) #, True) + +def digi_task_name(det, tf): + """ + Wrapper to retrieve name of digitisation task + """ + return stage_task_name(det, "digi", tf) + +def reco_task_name(det, tf): + """ + Wrapper to retrieve name of reconstruction task + """ + return stage_task_name(det, "reco", tf) + +def match_task_name(det, tf): + """ + Wrapper to retrieve name of match task + """ + return stage_task_name(det, "match", tf) + +def add_predefined_tasks(workflow, tasks): + """ + Add a task that was defined/created other than using add_task or add_det_task + """ + for task in tasks: + to_be_added = True + for need in task["needs"]: + if need not in TASK_LOOKUP.added_task_names: + to_be_added = False + break + if to_be_added: + workflow.append(task) + # ----------- START WORKFLOW CONSTRUCTION ----------------------------- # set the time to start of run (if no timestamp specified) @@ -218,7 +470,6 @@ def retrieve_sor(run_number): NTIMEFRAMES=int(args.tf) NWORKERS=args.j -MODULES = "--skipModules ZDC" if not args.with_ZDC else "" SIMENGINE=args.e BFIELD=args.field RNDSEED=args.seed # typically the argument should be the jobid, but if we get None the current time is used for the initialisation @@ -228,6 +479,7 @@ def retrieve_sor(run_number): workflow={} workflow['stages'] = [] +stages = workflow["stages"] def getDPL_global_options(bigshm=False): @@ -251,12 +503,11 @@ def getDPL_global_options(bigshm=False): # create the GRPs orbitsPerTF=256 -GRP_TASK = createTask(name='grpcreate', cpu='0') +GRP_TASK = add_task(stages, 'grpcreate', lab=["GRP"], cpu='0') GRP_TASK['cmd'] = 'o2-grp-simgrp-tool createGRPs --run ' + str(args.run) + ' --publishto ${ALICEO2_CCDB_LOCALCACHE:-.ccdb} -o grp --hbfpertf ' + str(orbitsPerTF) + ' --field ' + args.field GRP_TASK['cmd'] += ' --readoutDets ' + " ".join(activeDetectors) + ' --print ' if len(args.bcPatternFile) > 0: GRP_TASK['cmd'] += ' --bcPatternFile ' + str(args.bcPatternFile) -workflow['stages'].append(GRP_TASK) if doembedding: if not usebkgcache: @@ -315,7 +566,8 @@ def getDPL_global_options(bigshm=False): CONFKEYBKG=' --configKeyValues "' + args.confKeyBkg + '"' # Background PYTHIA configuration - BKG_CONFIG_task=createTask(name='genbkgconf') + BKG_CONFIG_task = add_task(stages, 'genbkgconf') + BKG_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"' if GENBKG == 'pythia8': print('Background generator seed: ', SIMSEED) @@ -332,30 +584,25 @@ def getDPL_global_options(bigshm=False): # TODO: we need a proper config container/manager so as to combine these local configs with external configs etc. CONFKEYBKG='--configKeyValues "GeneratorPythia8.config=pythia8bkg.cfg;' + args.confKeyBkg + '"' - workflow['stages'].append(BKG_CONFIG_task) - # background task configuration INIBKG='' if args.iniBkg!= '': INIBKG=' --configFile ' + args.iniBkg - BKGtask=createTask(name='bkgsim', lab=["GEANT"], needs=[BKG_CONFIG_task['name'], GRP_TASK['name']], cpu=NWORKERS ) + BKGtask = add_task(stages, 'bkgsim', needs=[BKG_CONFIG_task['name'], GRP_TASK['name']], lab=["GEANT"], cpu=NWORKERS ) BKGtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + SIMENGINE + ' -j ' + str(NWORKERS) + ' -n ' + str(NBKGEVENTS) \ + ' -g ' + str(GENBKG) + ' ' + str(MODULES) + ' -o bkg ' + str(INIBKG) \ + ' --field ' + str(BFIELD) + ' ' + str(CONFKEYBKG) \ - + ('',' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) + + ('',' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) + ' -m ' + " ".join(activeModules) if not "all" in activeDetectors: BKGtask['cmd'] += ' --readoutDetectors ' + " ".join(activeDetectors) - workflow['stages'].append(BKGtask) - # check if we should upload background event - if args.upload_bkg_to!=None: - BKGuploadtask=createTask(name='bkgupload', needs=[BKGtask['name']], cpu='0') + if args.upload_bkg_to is not None: + BKGuploadtask = add_task(stages, 'bkgupload', needs=[BKGtask['name']], cpu='0') BKGuploadtask['cmd']='alien.py mkdir ' + args.upload_bkg_to + ';' BKGuploadtask['cmd']+='alien.py cp -f bkg* ' + args.upload_bkg_to + ';' - workflow['stages'].append(BKGuploadtask) else: # here we are reusing existing background events from ALIEN @@ -369,39 +616,33 @@ def getDPL_global_options(bigshm=False): # we can introduce a "retry" feature in the copy process) # Step 1: header and link files - BKG_HEADER_task=createTask(name='bkgdownloadheader', cpu='0', lab=['BKGCACHE']) + BKG_HEADER_task = add_task(stages, 'bkgdownloadheader', cpu='0', lab=['BKGCACHE']) BKG_HEADER_task['cmd']='alien.py cp ' + args.use_bkg_from + 'bkg_MCHeader.root .' BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_geometry.root .' BKG_HEADER_task['cmd']=BKG_HEADER_task['cmd'] + ';alien.py cp ' + args.use_bkg_from + 'bkg_grp.root .' - workflow['stages'].append(BKG_HEADER_task) # a list of smaller sensors (used to construct digitization tasks in a parametrized way) -smallsensorlist = [ "ITS", "TOF", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV" ] -if args.with_ZDC: - smallsensorlist += [ "ZDC" ] +smallsensorlist = [ "ITS", "TOF", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV", "ZDC"] # a list of detectors that serve as input for the trigger processor CTP --> these need to be processed together for now ctp_trigger_inputlist = [ "FT0", "FV0" ] BKG_HITDOWNLOADER_TASKS={} for det in [ 'TPC', 'TRD' ] + smallsensorlist + ctp_trigger_inputlist: if usebkgcache: - BKG_HITDOWNLOADER_TASKS[det] = createTask(str(det) + 'hitdownload', cpu='0', lab=['BKGCACHE']) + BKG_HITDOWNLOADER_TASKS[det] = add_task(stages, str(det) + 'hitdownload', cpu='0', lab=['BKGCACHE']) BKG_HITDOWNLOADER_TASKS[det]['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Hits' + str(det) + '.root .' - workflow['stages'].append(BKG_HITDOWNLOADER_TASKS[det]) else: BKG_HITDOWNLOADER_TASKS[det] = None if usebkgcache: - BKG_KINEDOWNLOADER_TASK = createTask(name='bkgkinedownload', cpu='0', lab=['BKGCACHE']) + BKG_KINEDOWNLOADER_TASK = add_task(stages, 'bkgkinedownload', cpu='0', lab=['BKGCACHE']) BKG_KINEDOWNLOADER_TASK['cmd'] = 'alien.py cp ' + args.use_bkg_from + 'bkg_Kine.root .' - workflow['stages'].append(BKG_KINEDOWNLOADER_TASK) # We download some binary files, necessary for processing # Eventually, these files/objects should be queried directly from within these tasks? -MATBUD_DOWNLOADER_TASK = createTask(name='matbuddownloader', cpu='0') +MATBUD_DOWNLOADER_TASK = add_task(stages, 'matbuddownloader', cpu='0') MATBUD_DOWNLOADER_TASK['cmd'] = '[ -f matbud.root ] || ${O2_ROOT}/bin/o2-ccdb-downloadccdbfile --host http://alice-ccdb.cern.ch/ -p GLO/Param/MatLUT -o matbud.root --no-preserve-path --timestamp ' + str(args.timestamp) -workflow['stages'].append(MATBUD_DOWNLOADER_TASK) # loop over timeframes for tf in range(1, NTIMEFRAMES + 1): @@ -476,7 +717,7 @@ def getDPL_global_options(bigshm=False): exit(1) # produce the signal configuration - SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir) + SGN_CONFIG_task= add_task(stages, f'gensgnconf_{tf}', tf=tf, cwd=timeframeworkdir) SGN_CONFIG_task['cmd'] = 'echo "placeholder / dummy task"' if GENERATOR == 'pythia8' and PROCESS!='': SGN_CONFIG_task['cmd'] = '${O2DPG_ROOT}/MC/config/common/pythia8/utils/mkpy8cfg.py \ @@ -504,8 +745,6 @@ def getDPL_global_options(bigshm=False): # print('o2dpg_sim_workflow: Error! configuration file not provided') # exit(1) - workflow['stages'].append(SGN_CONFIG_task) - # ----------------- # transport signals # ----------------- @@ -520,22 +759,21 @@ def getDPL_global_options(bigshm=False): signalneeds = signalneeds + [ BKGtask['name'] ] else: signalneeds = signalneeds + [ BKG_HEADER_task['name'] ] - SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], relative_cpu=5/8, n_workers=NWORKERS, mem='2000') + SGNtask = add_task(stages, f'sgnsim_{tf}', needs=signalneeds, tf=tf, cwd=timeframeworkdir, lab=["GEANT"], relative_cpu=5/8, n_workers=NWORKERS, mem='2000') SGNtask['cmd']='${O2_ROOT}/bin/o2-sim -e ' + str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' --seed ' + str(TFSEED) \ + ' --field ' + str(BFIELD) + ' -j ' + str(NWORKERS) + ' -g ' + str(GENERATOR) \ + ' ' + str(TRIGGER) + ' ' + str(CONFKEY) + ' ' + str(INIFILE) \ - + ' -o ' + signalprefix + ' ' + embeddinto \ + + ' -o ' + signalprefix + ' ' + embeddinto + ' -m ' + " ".join(activeModules) \ + ('', ' --timestamp ' + str(args.timestamp))[args.timestamp!=-1] + ' --run ' + str(args.run) if not "all" in activeDetectors: SGNtask['cmd'] += ' --readoutDetectors ' + " ".join(activeDetectors) - workflow['stages'].append(SGNtask) # some tasks further below still want geometry + grp in fixed names, so we provide it here # Alternatively, since we have timeframe isolation, we could just work with standard o2sim_ files # We need to be careful here and distinguish between embedding and non-embedding cases # (otherwise it can confuse itstpcmatching, see O2-2026). This is because only one of the GRPs is updated during digitization. if doembedding: - LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[BKG_HEADER_task['name'] if usebkgcache else BKGtask['name'] ], tf=tf, cwd=timeframeworkdir, cpu='0',mem='0') + LinkGRPFileTask = add_task(stages, f'linkGRP_{tf}', needs=[BKG_HEADER_task['name'] if usebkgcache else BKGtask['name'] ], tf=tf, cwd=timeframeworkdir, cpu='0',mem='0') LinkGRPFileTask['cmd']=''' ln -nsf ../bkg_grp.root o2sim_grp.root; ln -nsf ../bkg_grpecs.root o2sim_grpecs.root; @@ -548,9 +786,8 @@ def getDPL_global_options(bigshm=False): ln -nsf ../bkg_grpecs.root bkg_grpecs.root ''' else: - LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir, cpu='0', mem='0') + LinkGRPFileTask = add_task(stages, f'linkGRP_{tf}', needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir, cpu='0', mem='0') LinkGRPFileTask['cmd']='ln -nsf ' + signalprefix + '_grp.root o2sim_grp.root ; ln -nsf ' + signalprefix + '_geometry.root o2sim_geometry.root; ln -nsf ' + signalprefix + '_geometry-aligned.root o2sim_geometry-aligned.root' - workflow['stages'].append(LinkGRPFileTask) # ------------------ # digitization steps @@ -641,7 +878,7 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): # This task creates the basic setup for all digitizers! all digitization configKeyValues need to be given here - ContextTask = createTask(name='digicontext_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='1') + ContextTask = add_task(stages, f'digicontext_{tf}', needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='1') # this is just to have the digitizer ini file ContextTask['cmd'] = '${O2_ROOT}/bin/o2-sim-digitizer-workflow --only-context --interactionRate ' + str(INTRATE) \ + ' ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption \ @@ -658,27 +895,21 @@ def putConfigValuesNew(listOfMainKeys=[], localCF = {}): if BCPATTERN != '': ContextTask['cmd'] += ' --bcPatternFile "' + BCPATTERN + '"' - workflow['stages'].append(ContextTask) - tpcdigineeds=[ContextTask['name'], LinkGRPFileTask['name']] if usebkgcache: tpcdigineeds += [ BKG_HITDOWNLOADER_TASKS['TPC']['name'] ] - TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=tpcdigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='9000') + TPCDigitask = add_digi_task(stages, 'tpc', needs=tpcdigineeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='9000') TPCDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTPC.root . ;')[doembedding] TPCDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate ' + str(INTRATE) + ' --tpc-lanes ' + str(NWORKERS) + ' --incontext ' + str(CONTEXTFILE) + ' --tpc-chunked-writer --disable-write-ini ' + putConfigValuesNew(["TPCGasParam"]) - workflow['stages'].append(TPCDigitask) trddigineeds = [ContextTask['name']] if usebkgcache: trddigineeds += [ BKG_HITDOWNLOADER_TASKS['TRD']['name'] ] - TRDDigitask=createTask(name='trddigi_'+str(tf), needs=trddigineeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000') + TRDDigitask = add_digi_task(stages, 'TRD', needs=trddigineeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu=NWORKERS, mem='8000') TRDDigitask['cmd'] = ('','ln -nfs ../bkg_HitsTRD.root . ;')[doembedding] TRDDigitask['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TRD --interactionRate ' + str(INTRATE) + putConfigValuesNew(localCF={"TRDSimParams.digithreads" : NWORKERS}) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' - if isActive("TRD"): - workflow['stages'].append(TRDDigitask) + # these are digitizers which are single threaded def createRestDigiTask(name, det='ALLSMALLER'): @@ -689,249 +920,185 @@ def createRestDigiTask(name, det='ALLSMALLER'): if usebkgcache: for d in itertools.chain(smallsensorlist, ctp_trigger_inputlist): tneeds += [ BKG_HITDOWNLOADER_TASKS[d]['name'] ] - t = createTask(name=name, needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + t = add_digi_task(stages, smallsensorlist, needs=tneeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') t['cmd'] = ('','ln -nfs ../bkg_Hits*.root . ;')[doembedding] - detlist = '' - for d in smallsensorlist: - if isActive(d): - if len(detlist) > 0: - detlist += ',' - detlist += d - t['cmd'] += commondigicmd + ' --onlyDet ' + detlist - t['cmd'] += ' --ccdb-tof-sa ' - t['cmd'] += (' --combine-devices ','')[args.no_combine_dpl_devices] - workflow['stages'].append(t) + onlyDet = adjust_and_make_sources_string_detectors(smallsensorlist) + if onlyDet: + t['cmd'] += commondigicmd + f' --onlyDet {onlyDet}' + t['cmd'] += ' --ccdb-tof-sa ' + t['cmd'] += (' --combine-devices ', '')[args.no_combine_dpl_devices] return t else: # here we create individual digitizers - if isActive(det): - if usebkgcache: + if usebkgcache: tneeds += [ BKG_HITDOWNLOADER_TASKS[det]['name'] ] - t = createTask(name=name, needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') - t['cmd'] = ('','ln -nfs ../bkg_Hits' + str(det) + '.root . ;')[doembedding] - t['cmd'] += commondigicmd + ' --onlyDet ' + str(det) - if det == 'TOF': - t['cmd'] += ' --ccdb-tof-sa' - workflow['stages'].append(t) - return t + t = add_digi_task(stages, name, needs=tneeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + t['cmd'] = ('','ln -nfs ../bkg_Hits' + str(det) + '.root . ;')[doembedding] + t['cmd'] += commondigicmd + ' --onlyDet ' + str(det) + if det == 'TOF': + t['cmd'] += ' --ccdb-tof-sa' + return t - det_to_digitask={} + det_to_digitask = {} - if not args.no_combine_smaller_digi==True: - det_to_digitask['ALLSMALLER']=createRestDigiTask("restdigi_"+str(tf)) + if not args.no_combine_smaller_digi: + det_to_digitask['ALLSMALLER']=createRestDigiTask("restdigi") for det in smallsensorlist: - name=str(det).lower() + "digi_" + str(tf) - t = det_to_digitask['ALLSMALLER'] if (not args.no_combine_smaller_digi==True) else createRestDigiTask(name, det) + name=str(det).lower() + "digi" + t = det_to_digitask['ALLSMALLER'] if not args.no_combine_smaller_digi else createRestDigiTask(name, det) det_to_digitask[det]=t # detectors serving CTP need to be treated somewhat special since CTP needs # these inputs at the same time --> still need to be made better tneeds = [ContextTask['name']] - t = createTask(name="ft0fv0ctp_digi_" + str(tf), needs=tneeds, - tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1') + sources = adjust_and_make_sources_string_detectors("FT0,FV0,CTP") + t = add_digi_task(stages, ["FT0", "FV0", "CTP"], needs=tneeds, tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1', condition=sources=="FT0,FV0,CTP") t['cmd'] = ('','ln -nfs ../bkg_HitsFT0.root . ; ln -nfs ../bkg_HitsFV0.root . ;')[doembedding] - t['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet FT0,FV0,CTP --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' + putConfigValuesNew() + (' --combine-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(t) + t['cmd'] += '${O2_ROOT}/bin/o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet ' + sources + ' --interactionRate ' + str(INTRATE) + ' --incontext ' + str(CONTEXTFILE) + ' --disable-write-ini' + putConfigValuesNew() + (' --combine-devices', '')[args.no_combine_dpl_devices] det_to_digitask["FT0"]=t det_to_digitask["FV0"]=t - def getDigiTaskName(det): - t = det_to_digitask.get(det) - if t == None: - return "undefined" - return t['name'] - # ----------- # reco # ----------- tpcreconeeds=[] if not args.combine_tpc_clusterization: - # We treat TPC clusterization in multiple (sector) steps in order to + # We treat TPC clusterization in multiple (sector) steps in order to # stay within the memory limit or to parallelize over sector from outside (not yet supported within cluster algo) tpcclustertasks=[] sectorpertask=18 for s in range(0,35,sectorpertask): - taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + '_' + str(tf) + taskname = 'tpcclusterpart' + str((int)(s/sectorpertask)) + f"_{tf}" tpcclustertasks.append(taskname) - tpcclussect = createTask(name=taskname, needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') + tpcclussect = add_task(stages, taskname, needs=[digi_task_name("TPC", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='2', mem='8000') tpcclussect['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' --tpc-lanes ' + str(NWORKERS) tpcclussect['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type digitizer --output-type clusters,send-clusters-per-sector --outfile tpc-native-clusters-part' + str((int)(s/sectorpertask)) + '.root --tpc-sectors ' + str(s)+'-'+str(s+sectorpertask-1) + ' ' + putConfigValuesNew(["GPU_global"], {"GPU_proc.ompThreads" : 4}) tpcclussect['env'] = { "OMP_NUM_THREADS" : "4", "SHMSIZE" : "16000000000" } - workflow['stages'].append(tpcclussect) - - TPCCLUSMERGEtask=createTask(name='tpcclustermerge_'+str(tf), needs=tpcclustertasks, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='10000') + TPCCLUSMERGEtask = add_task(stages, name=f'tpcclustermerge_{tf}', needs=tpcclustertasks, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='10000') TPCCLUSMERGEtask['cmd']='${O2_ROOT}/bin/o2-commonutils-treemergertool -i tpc-native-clusters-part*.root -o tpc-native-clusters.root -t tpcrec' #--asfriend preferable but does not work - workflow['stages'].append(TPCCLUSMERGEtask) - tpcreconeeds.append(TPCCLUSMERGEtask['name']) + tpcreconeeds.append(TPCCLUSMERGEtask["name"]) else: - tpcclus = createTask(name='tpccluster_' + str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000') + tpcclus = add_task(stages, f'tpccluster_{tf}', needs=[digi_task_name("TPC", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='2000') tpcclus['cmd'] = '${O2_ROOT}/bin/o2-tpc-chunkeddigit-merger --tpc-lanes ' + str(NWORKERS) tpcclus['cmd'] += ' | ${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options() + ' --input-type digitizer --output-type clusters,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam"],{"GPU_proc.ompThreads" : 1}) - workflow['stages'].append(tpcclus) - tpcreconeeds.append(tpcclus['name']) + tpcreconeeds.append(tpcclus["name"]) - TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') + TPCRECOtask = add_reco_task(stages, 'tpc', needs=tpcreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], relative_cpu=3/8, mem='16000') TPCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --input-type clusters --output-type tracks,send-clusters-per-sector ' + putConfigValuesNew(["GPU_global","TPCGasParam"], {"GPU_proc.ompThreads":NWORKERS}) - workflow['stages'].append(TPCRECOtask) - ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[getDigiTaskName("ITS"), MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + ITSRECOtask = add_reco_task(stages, 'its', needs=[digi_task_name("ITS", tf), MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') ITSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options() \ + putConfigValuesNew(["ITSVertexerParam", "ITSAlpideParam", 'ITSClustererParam'], {"NameConf.mDirMatLUT" : ".."}) - workflow['stages'].append(ITSRECOtask) - - FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[getDigiTaskName("FT0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') + FT0RECOtask = add_reco_task(stages, 'FT0', needs=[digi_task_name("FT0", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') FT0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-ft0-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FT0RECOtask) - ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', relative_cpu=3/8) + ITSTPCMATCHtask = add_match_task(stages, "ITS-TPC", needs=[reco_task_name("ITS", tf), reco_task_name("TPC", tf), reco_task_name("FT0", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', relative_cpu=3/8) ITSTPCMATCHtask['cmd']= '${O2_ROOT}/bin/o2-tpcits-match-workflow ' + getDPL_global_options(bigshm=True) + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\" --use-ft0' + putConfigValuesNew(['MFTClustererParam', 'ITSCATrackerParam', 'tpcitsMatch', 'TPCGasParam', 'ITSClustererParam'], {"NameConf.mDirMatLUT" : ".."}) - workflow['stages'].append(ITSTPCMATCHtask) - TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDDigitask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + trd_track_sources = make_sources_string_reco_match("TPC,ITS-TPC", tf, anchorConfig.get("o2-trd-global-tracking-options",{}).get("track-sources", None)) + trd_tracking_needs = [digi_task_name("TRD", tf), reco_task_name("TPC", tf)] + trd_track_sources = f"--track-sources {trd_track_sources}" if trd_track_sources else "" + if "ITS-TPC" in trd_track_sources: + trd_tracking_needs.append(match_task_name("ITS-TPC", tf)) + TRDTRACKINGtask = add_task(stages, f'trdreco_tracklet_transformer_{tf}', needs=trd_tracking_needs, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') TRDTRACKINGtask['cmd'] = '${O2_ROOT}/bin/o2-trd-tracklet-transformer ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(TRDTRACKINGtask) # FIXME This is so far a workaround to avoud a race condition for trdcalibratedtracklets.root - TRDTRACKINGtask2 = createTask(name='trdreco2_'+str(tf), needs=[TRDTRACKINGtask['name'],MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + TRDTRACKINGtask2 = add_reco_task(stages, 'TRD', needs=[TRDTRACKINGtask['name'], MATBUD_DOWNLOADER_TASK['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000') + add_as_match_task(TRDTRACKINGtask2, "TPC-TRD", tf) + if "ITS-TPC" in trd_track_sources: + add_as_match_task(TRDTRACKINGtask2, "ITS-TPC-TRD", tf) + TRDTRACKINGtask2['cmd'] = '${O2_ROOT}/bin/o2-trd-global-tracking ' + getDPL_global_options(bigshm=True) \ + putConfigValuesNew(['ITSClustererParam', 'ITSCATrackerParam', 'TPCGasParam'], {"NameConf.mDirMatLUT" : ".."}) \ - + " --track-sources " + anchorConfig.get("o2-trd-global-tracking-options",{}).get("track-sources","all") - workflow['stages'].append(TRDTRACKINGtask2) + + " " + trd_track_sources - TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], getDigiTaskName("TOF")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + TOFRECOtask = add_reco_task(stages, 'TOF', needs=[digi_task_name("TOF", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') TOFRECOtask['cmd'] = '${O2_ROOT}/bin/o2-tof-reco-workflow --use-ccdb ' + getDPL_global_options() + putConfigValuesNew() - workflow['stages'].append(TOFRECOtask) - - toftpcmatchneeds = [TOFRECOtask['name'], TPCRECOtask['name']] - toftracksrcdefault = "TPC,ITS-TPC" - if isActive('TRD'): - toftpcmatchneeds.append(TRDTRACKINGtask2['name']) - toftracksrcdefault+=",TPC-TRD,ITS-TPC-TRD" - TOFTPCMATCHERtask = createTask(name='toftpcmatch_'+str(tf), needs=toftpcmatchneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000') + + toftracksrcdefault = make_sources_string_reco_match("TPC,ITS-TPC,TPC-TRD,ITS-TPC-TRD", tf, anchorConfig.get("o2-tof-matcher-workflow-options",{}).get("track-sources",None)) + tof_match_needs = [reco_task_name("TOF", tf)] + if "TRD" in toftracksrcdefault: + tof_match_needs.append(reco_task_name("TRD", tf)) + if "TPC" in toftracksrcdefault: + tof_match_needs.append(match_task_name("ITS-TPC", tf)) + TOFTPCMATCHERtask = add_match_task(stages, "TOF-TPC", needs=tof_match_needs, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1000', condition=toftracksrcdefault) TOFTPCMATCHERtask['cmd'] = '${O2_ROOT}/bin/o2-tof-matcher-workflow ' + getDPL_global_options() \ + putConfigValuesNew(["ITSClustererParam", 'TPCGasParam', 'ITSCATrackerParam', 'MFTClustererParam']) \ - + " --track-sources " + anchorConfig.get("o2-tof-matcher-workflow-options",{}).get("track-sources",toftracksrcdefault) + (' --combine-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(TOFTPCMATCHERtask) + + " --track-sources " + toftracksrcdefault + (' --combine-devices', '')[args.no_combine_dpl_devices] - # MFT reco: needing access to kinematics (when assessment enabled) - mftreconeeds = [getDigiTaskName("MFT")] + mftreconeeds = [digi_task_name("MFT", tf)] if usebkgcache: mftreconeeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] - - MFTRECOtask = createTask(name='mftreco_'+str(tf), needs=mftreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') - MFTRECOtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] - MFTRECOtask['cmd'] += '${O2_ROOT}/bin/o2-mft-reco-workflow ' + getDPL_global_options() + putConfigValuesNew(['MFTTracking', 'MFTAlpideParam', 'ITSClustererParam','MFTClustererParam']) + MFTRECOtask = add_reco_task(stages, 'MFT', needs=mftreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MFTRECOtask['cmd'] = '${O2_ROOT}/bin/o2-mft-reco-workflow ' + getDPL_global_options() + putConfigValuesNew(['MFTTracking', 'MFTAlpideParam', 'ITSClustererParam','MFTClustererParam']) if args.mft_assessment_full == True: MFTRECOtask['cmd']+= ' --run-assessment ' - workflow['stages'].append(MFTRECOtask) # MCH reco: needing access to kinematics ... so some extra logic needed here - mchreconeeds = [getDigiTaskName("MCH")] + mchreconeeds = [digi_task_name("MCH", tf)] if usebkgcache: mchreconeeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] - - MCHRECOtask = createTask(name='mchreco_'+str(tf), needs=mchreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MCHRECOtask = add_reco_task(stages, "MCH", needs=mchreconeeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MCHRECOtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] MCHRECOtask['cmd'] += '${O2_ROOT}/bin/o2-mch-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(MCHRECOtask) - MIDRECOtask = createTask(name='midreco_'+str(tf), needs=[getDigiTaskName("MID")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MIDRECOtask = add_reco_task(stages, 'MID', needs=[digi_task_name("MID", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MIDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-mid-digits-reader-workflow | ${O2_ROOT}/bin/o2-mid-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(MIDRECOtask) - FDDRECOtask = createTask(name='fddreco_'+str(tf), needs=[getDigiTaskName("FDD")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + FDDRECOtask = add_reco_task(stages, 'FDD', needs=[digi_task_name("FDD", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') FDDRECOtask['cmd'] = '${O2_ROOT}/bin/o2-fdd-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FDDRECOtask) - FV0RECOtask = createTask(name='fv0reco_'+str(tf), needs=[getDigiTaskName("FV0")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + FV0RECOtask = add_reco_task(stages, 'FV0', needs=[digi_task_name("FV0", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') FV0RECOtask['cmd'] = '${O2_ROOT}/bin/o2-fv0-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(FV0RECOtask) # calorimeters - EMCRECOtask = createTask(name='emcalreco_'+str(tf), needs=[getDigiTaskName("EMC")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + EMCRECOtask = add_reco_task(stages, 'EMC', needs=[digi_task_name("EMC", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') EMCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-emcal-reco-workflow --input-type digits --output-type cells --infile emcaldigits.root ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(EMCRECOtask) - PHSRECOtask = createTask(name='phsreco_'+str(tf), needs=[getDigiTaskName("PHS")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + PHSRECOtask = add_reco_task(stages, 'PHS', needs=[digi_task_name("PHS", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') PHSRECOtask['cmd'] = '${O2_ROOT}/bin/o2-phos-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(PHSRECOtask) - CPVRECOtask = createTask(name='cpvreco_'+str(tf), needs=[getDigiTaskName("CPV")], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + CPVRECOtask = add_reco_task(stages, 'CPV', needs=[digi_task_name("CVP", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') CPVRECOtask['cmd'] = '${O2_ROOT}/bin/o2-cpv-reco-workflow ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(CPVRECOtask) - if args.with_ZDC: - ZDCRECOtask = createTask(name='zdcreco_'+str(tf), needs=[getDigiTaskName("ZDC")], tf=tf, cwd=timeframeworkdir, lab=["ZDC"]) - ZDCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-zdc-digits-reco ' + getDPL_global_options() + putConfigValues() - workflow['stages'].append(ZDCRECOtask) + ZDCRECOtask = add_reco_task(stages, 'ZDC', needs=[digi_task_name("ZDC", tf)], tf=tf, cwd=timeframeworkdir, lab=["ZDC"]) + ZDCRECOtask['cmd'] = '${O2_ROOT}/bin/o2-zdc-digits-reco ' + getDPL_global_options() + putConfigValues() ## forward matching - MCHMIDMATCHtask = createTask(name='mchmidMatch_'+str(tf), needs=[MCHRECOtask['name'], MIDRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MCHMIDMATCHtask = add_match_task(stages, "MCH-MID", needs=[reco_task_name("MCH", tf), reco_task_name("MID", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MCHMIDMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-muon-tracks-matcher-workflow ' + getDPL_global_options() - workflow['stages'].append(MCHMIDMATCHtask) - MFTMCHMATCHtask = createTask(name='mftmchMatch_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + MFTMCHMATCHtask = add_match_task(stages, "MFT-MCH", needs=[match_task_name("MCH-MID", tf), reco_task_name("MFT", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTMCHMATCHtask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) if args.fwdmatching_assessment_full == True: MFTMCHMATCHtask['cmd']+= ' | o2-globalfwd-assessment-workflow ' MFTMCHMATCHtask['cmd']+= getDPL_global_options() - workflow['stages'].append(MFTMCHMATCHtask) - if args.fwdmatching_save_trainingdata == True: - MFTMCHMATCHTraintask = createTask(name='mftmchMatchTrain_'+str(tf), needs=[MCHMIDMATCHtask['name'], MFTRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') + if args.fwdmatching_save_trainingdata == True: # TODO This seems to be exactly the same as the previous task + MFTMCHMATCHTraintask = add_task(stages, f'mftmchMatchTrain_{tf}', needs=[match_task_name("MCH-MID", tf), reco_task_name("MFT", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='1500') MFTMCHMATCHTraintask['cmd'] = '${O2_ROOT}/bin/o2-globalfwd-matcher-workflow ' + putConfigValuesNew(['ITSAlpideConfig','MFTAlpideConfig'],{"FwdMatching.useMIDMatch":"true"}) MFTMCHMATCHTraintask['cmd']+= getDPL_global_options() - workflow['stages'].append(MFTMCHMATCHTraintask) - - pvfinderneeds = [ITSTPCMATCHtask['name']] - if isActive('FT0'): - pvfinderneeds += [FT0RECOtask['name']] - if isActive('TOF'): - pvfinderneeds += [TOFTPCMATCHERtask['name']] - if isActive('MFT'): - pvfinderneeds += [MFTRECOtask['name']] - if isActive('MCH'): - pvfinderneeds += [MCHRECOtask['name']] - if isActive('TRD'): - pvfinderneeds += [TRDTRACKINGtask2['name']] - if isActive('FDD'): - pvfinderneeds += [FDDRECOtask['name']] - if isActive('MFT') and isActive('MCH'): - pvfinderneeds += [MFTMCHMATCHtask['name']] - - # Take None as default, we only add more if nothing from anchorConfig - pvfinder_sources = anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertexing-sources", None) - pvfinder_matching_sources = anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertex-track-matching-sources", None) - if not pvfinder_sources: - pvfinder_sources = "ITS,ITS-TPC,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - pvfinder_sources += ",MID" - pvfinderneeds += [MIDRECOtask['name']] - if not pvfinder_matching_sources: - pvfinder_matching_sources = "ITS,MFT,TPC,ITS-TPC,MCH,MFT-MCH,TPC-TOF,TPC-TRD,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - pvfinder_matching_sources += ",MID" - pvfinderneeds += [MIDRECOtask['name']] - - PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=pvfinderneeds, tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000') + + pvfinder_sources = make_sources_string_reco_match("ITS,ITS-TPC,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertexing-sources", None)) + pvfinder_matching_sources = make_sources_string_reco_match("ITS,MFT,TPC,ITS-TPC,MCH,MFT-MCH,TPC-TOF,TPC-TRD,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-primary-vertexing-workflow-options",{}).get("vertex-track-matching-sources", None)) + + PVFINDERtask = add_task(stages, f'pvfinder_{tf}', needs=[*match_task_name("ANY", tf), *reco_task_name("ANY", tf)], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=NWORKERS, mem='4000', condition=pvfinder_sources and pvfinder_matching_sources) PVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-primary-vertexing-workflow ' \ + getDPL_global_options() + putConfigValuesNew(['ITSAlpideParam','MFTAlpideParam', 'pvertexer', 'TPCGasParam'], {"NameConf.mDirMatLUT" : ".."}) - PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(PVFINDERtask) + PVFINDERtask['cmd'] += ' --vertexing-sources ' + pvfinder_sources + ' --vertex-track-matching-sources ' + pvfinder_matching_sources + (' --combine-source-devices', '')[args.no_combine_dpl_devices] if includeFullQC or includeLocalQC: def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): - task = createTask(name=taskName + '_local' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') + task = add_task(stages, taskName + f'_local_{tf}', needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') objectsFile = objectsFile if len(objectsFile) > 0 else taskName + '.root' # the --local-batch argument will make QC Tasks store their results in a file and merge with any existing objects task['cmd'] = f'{readerCommand} | o2-qc --config {configFilePath}' + \ @@ -940,23 +1107,23 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): ' ' + getDPL_global_options() # Prevents this task from being run for multiple TimeFrames at the same time, thus trying to modify the same file. task['semaphore'] = objectsFile - workflow['stages'].append(task) ### MFT # to be enabled once MFT Digits should run 5 times with different configurations for flp in range(5): addQCPerTF(taskName='mftDigitsQC' + str(flp), - needs=[getDigiTaskName("MFT")], + needs=[digi_task_name("MFT", tf)], readerCommand='o2-qc-mft-digits-root-file-reader --mft-digit-infile=mftdigits.root', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-digit-' + str(flp) + '.json', objectsFile='mftDigitsQC.root') + addQCPerTF(taskName='mftClustersQC', - needs=[MFTRECOtask['name']], + needs=[reco_task_name("MFT", tf)], readerCommand='o2-global-track-cluster-reader --track-types none --cluster-types MFT', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-cluster.json') addQCPerTF(taskName='mftAsyncQC', - needs=[MFTRECOtask['name']], + needs=[reco_task_name("MFT", tf)], readerCommand='o2-global-track-cluster-reader --track-types MFT --cluster-types MFT', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/qc-mft-async.json') @@ -966,42 +1133,41 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): # readerCommand=, # configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-tracking-direct.json') addQCPerTF(taskName='tpcStandardQC', - needs=[TPCRECOtask['name']], - readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --tpc-native-cluster-reader "--infile tpc-native-clusters.root" --input-type clusters,tracks', - # readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --input-type tracks', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-standard-direct.json') + needs=[reco_task_name("TPC", tf)], + readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --tpc-native-cluster-reader "--infile tpc-native-clusters.root" --input-type clusters,tracks', + # readerCommand='o2-tpc-file-reader --tpc-track-reader "--infile tpctracks.root" --input-type tracks', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tpc-qc-standard-direct.json') ### TRD addQCPerTF(taskName='trdDigitsQC', - needs=[TRDDigitask['name']], + needs=[digi_task_name("TRD", tf)], readerCommand='o2-trd-trap-sim', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/trd-digits-task.json') ### TOF addQCPerTF(taskName='tofDigitsQC', - needs=[getDigiTaskName("TOF")], + needs=[digi_task_name("TOF", tf)], readerCommand='${O2_ROOT}/bin/o2-tof-reco-workflow --input-type digits --output-type none', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofdigits.json', objectsFile='tofDigitsQC.root') addQCPerTF(taskName='tofft0PIDQC', - needs=[ITSTPCMATCHtask['name'], TOFRECOtask['name'], FT0RECOtask['name']], + needs=[match_task_name("ITS-TPC", tf), reco_task_name("TOF", tf), reco_task_name("FT0", tf)], readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC,TPC,ITS-TPC-TOF,TPC-TOF" --cluster-types FT0', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/pidft0tof.json') addQCPerTF(taskName='tofPIDQC', - needs=[ITSTPCMATCHtask['name'], TOFRECOtask['name']], + needs=[match_task_name("ITS-TPC", tf), reco_task_name("TOF", tf)], readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC,TPC,ITS-TPC-TOF,TPC-TOF" --cluster-types none', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/pidtof.json') ### EMCAL - if isActive('EMC'): - addQCPerTF(taskName='emcCellQC', - needs=[EMCRECOtask['name']], - readerCommand='o2-emcal-cell-reader-workflow --infile emccells.root', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/emc-cell-task.json') + addQCPerTF(taskName='emcCellQC', + needs=[reco_task_name("EMC", tf)], + readerCommand='o2-emcal-cell-reader-workflow --infile emccells.root', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/emc-cell-task.json') ### FT0 addQCPerTF(taskName='RecPointsQC', - needs=[FT0RECOtask['name']], + needs=[reco_task_name("FT0", tf)], readerCommand='o2-ft0-recpoints-reader-workflow --infile o2reco_ft0.root', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/ft0-reconstruction-config.json') @@ -1011,22 +1177,20 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): readerCommand='o2-primary-vertex-reader-workflow', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/vertexing-qc-direct-mc.json') addQCPerTF(taskName='ITSTPCmatchQC', - needs=[ITSTPCMATCHtask['name']], + needs=[match_task_name("ITS-TPC", tf)], readerCommand='o2-global-track-cluster-reader --track-types "TPC,ITS-TPC"', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/ITSTPCmatchedTracks_direct_MC.json') - if isActive('TOF'): - addQCPerTF(taskName='TOFMatchQC', - needs=[TOFTPCMATCHERtask['name']], - readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC" --cluster-types none', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_ITSTPCTOF_TPCTOF_direct_MC.json') - if isActive('TOF') and isActive('TRD'): - addQCPerTF(taskName='TOFMatchWithTRDQC', - needs=[TOFTPCMATCHERtask['name']], - readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC,ITS-TPC-TRD,ITS-TPC-TRD-TOF,TPC-TRD,TPC-TRD-TOF" --cluster-types none', - configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_AllTypes_direct_MC.json') + addQCPerTF(taskName='TOFMatchQC', + needs=[match_task_name("ITS-TPC", tf), match_task_name("TPC-TOF", tf)], + readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC" --cluster-types none', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_ITSTPCTOF_TPCTOF_direct_MC.json') + addQCPerTF(taskName='TOFMatchWithTRDQC', + needs=[match_task_name("TPC-TOF", tf), match_task_name("ITS-TPC", tf), reco_task_name("TRD", tf)], + readerCommand='o2-global-track-cluster-reader --track-types "ITS-TPC-TOF,TPC-TOF,TPC,ITS-TPC-TRD,ITS-TPC-TRD-TOF,TPC-TRD,TPC-TRD-TOF" --cluster-types none', + configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/tofMatchedTracks_AllTypes_direct_MC.json') ### ITS addQCPerTF(taskName='ITSTrackSimTask', - needs=[ITSRECOtask['name']], + needs=[reco_task_name("ITS", tf)], readerCommand='o2-global-track-cluster-reader --track-types "ITS" --cluster-types "ITS"', configFilePath='json://${O2DPG_ROOT}/MC/config/QC/json/its-mc-tracks-qc.json') @@ -1036,49 +1200,24 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if COLTYPE == "PbPb" or (doembedding and COLTYPEBKG == "PbPb"): svfinder_threads = ' --threads 3 ' svfinder_cpu = 3 - SVFINDERtask = createTask(name='svfinder_'+str(tf), needs=[PVFINDERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000') + svfinder_sources = make_sources_string_reco_match("ITS,ITS-TPC,TPC-TRD,TPC-TOF,ITS-TPC-TRD,ITS-TPC-TOF,MID", tf, anchorConfig.get("o2-secondary-vertexing-workflow-options",{}).get("vertexing-sources", None)) + SVFINDERtask = add_task(stages, f'svfinder_{tf}', needs=[PVFINDERtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu=svfinder_cpu, mem='5000', condition=svfinder_sources) SVFINDERtask['cmd'] = '${O2_ROOT}/bin/o2-secondary-vertexing-workflow ' SVFINDERtask['cmd'] += getDPL_global_options(bigshm=True) + svfinder_threads + putConfigValuesNew(['svertexer'], {"NameConf.mDirMatLUT" : ".."}) - # Take None as default, we only add more if nothing from anchorConfig - svfinder_sources = anchorConfig.get("o2-secondary-vertexing-workflow-options",{}).get("vertexing-sources", None) - if not svfinder_sources: - svfinder_sources = "ITS,ITS-TPC,TPC-TRD,TPC-TOF,ITS-TPC-TRD,ITS-TPC-TOF" - if isActive("MID"): - svfinder_sources += ",MID" - SVFINDERtask['cmd'] += ' --vertexing-sources ' + svfinder_sources + (' --combine-source-devices','')[args.no_combine_dpl_devices] - workflow['stages'].append(SVFINDERtask) + SVFINDERtask['cmd'] += ' --vertexing-sources ' + svfinder_sources + (' --combine-source-devices', '')[args.no_combine_dpl_devices] # ----------- # produce AOD # ----------- - # TODO This needs further refinement, sources and dependencies should be constructed dynamically - aodinfosources = 'ITS,MFT,MCH,TPC,ITS-TPC,MFT-MCH,ITS-TPC-TOF,TPC-TOF,FT0,FDD,CTP,TPC-TRD,ITS-TPC-TRD,EMC' - aodneeds = [PVFINDERtask['name'], SVFINDERtask['name']] - if isActive('FV0'): - aodneeds += [ FV0RECOtask['name'] ] - aodinfosources += ',FV0' - if isActive('TOF'): - aodneeds += [ TOFRECOtask['name'] ] - if isActive('TRD'): - aodneeds += [ TRDTRACKINGtask2['name'] ] - if isActive('EMC'): - aodneeds += [ EMCRECOtask['name'] ] - if isActive('CPV'): - aodneeds += [ CPVRECOtask['name'] ] - if isActive('PHS'): - aodneeds += [ PHSRECOtask['name'] ] - if isActive('MID'): - aodneeds += [ MIDRECOtask['name'] ] - aodinfosources += ',MID' - if args.with_ZDC and isActive('ZDC'): - aodneeds += [ ZDCRECOtask['name'] ] - aodinfosources += ',ZDC' + aodneeds = [PVFINDERtask['name'], SVFINDERtask['name'], digi_task_name("CTP", tf)] if usebkgcache: - aodneeds += [ BKG_KINEDOWNLOADER_TASK['name'] ] + aodneeds.append(BKG_KINEDOWNLOADER_TASK['name']) + aodinfosources = make_sources_string_reco_match('ITS,MFT,MCH,TPC,ITS-TPC,MFT-MCH,ITS-TPC-TOF,TPC-TOF,FT0,FDD,TPC-TRD,ITS-TPC-TRD,EMC,FV0,MID,ZDC', tf, anchorConfig.get("o2-aod-producer-workflow-options",{}).get("info-sources", None)) + AODtask = add_task(stages, f'aod_{tf}', needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1', condition=aodinfosources) + aodinfosources += ",CTP" aod_df_id = '{0:03}'.format(tf) - AODtask = createTask(name='aod_'+str(tf), needs=aodneeds, tf=tf, cwd=timeframeworkdir, lab=["AOD"], mem='4000', cpu='1') AODtask['cmd'] = ('','ln -nfs ../bkg_Kine.root . ;')[doembedding] AODtask['cmd'] += '[ -f AO2D.root ] && rm AO2D.root; ${O2_ROOT}/bin/o2-aod-producer-workflow --reco-mctracks-only 1 --aod-writer-keep dangling --aod-writer-resfile AO2D' # next line needed for meta data writing (otherwise lost) @@ -1088,7 +1227,7 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if args.run_anchored == False: AODtask['cmd'] += ' --aod-timeframe-id ${ALIEN_PROC_ID}' + aod_df_id AODtask['cmd'] += ' ' + getDPL_global_options(bigshm=True) - AODtask['cmd'] += ' --info-sources ' + anchorConfig.get("o2-aod-producer-workflow-options",{}).get("info-sources",str(aodinfosources)) + AODtask['cmd'] += ' --info-sources ' + aodinfosources AODtask['cmd'] += ' --lpmp-prod-tag ${ALIEN_JDL_LPMPRODUCTIONTAG:-unknown}' AODtask['cmd'] += ' --anchor-pass ${ALIEN_JDL_LPMANCHORPASSNAME:-unknown}' AODtask['cmd'] += ' --anchor-prod ${ALIEN_JDL_MCANCHOR:-unknown}' @@ -1096,8 +1235,6 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): if environ.get('O2DPG_AOD_NOTRUNCATE') != None or environ.get('ALIEN_JDL_O2DPG_AOD_NOTRUNCATE') != None: AODtask['cmd'] += ' --enable-truncation 0' # developer option to suppress precision truncation - workflow['stages'].append(AODtask) - # AOD merging / combination step (as individual stages) --> for the moment deactivated in favor or more stable global merging """ aodmergerneeds = [ AODtask['name'] ] @@ -1126,30 +1263,30 @@ def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): # taking away digits, clusters and other stuff as soon as possible. # TODO: cleanup by labels or task names if args.early_tf_cleanup == True: - TFcleanup = createTask(name='tfcleanup_'+str(tf), needs= [ AOD_merge_task['name'] ], tf=tf, cwd=timeframeworkdir, lab=["CLEANUP"], mem='0', cpu='1') + TFcleanup = add_task(stages, f'tfcleanup_{tf}', needs=[AOD_merge_task['name']], tf=tf, cwd=timeframeworkdir, lab=["CLEANUP"], mem='0', cpu='1') TFcleanup['cmd'] = 'rm *digi*.root;' TFcleanup['cmd'] += 'rm *cluster*.root' - workflow['stages'].append(TFcleanup); # AOD merging as one global final step aodmergerneeds = ['aod_' + str(tf) for tf in range(1, NTIMEFRAMES + 1)] -AOD_merge_task = createTask(name='aodmerge', needs = aodmergerneeds, lab=["AOD"], mem='2000', cpu='1') +AOD_merge_task = add_task(stages, 'aodmerge', needs=aodmergerneeds, lab=["AOD"], mem='2000', cpu='1') AOD_merge_task['cmd'] = ' [ -f aodmerge_input.txt ] && rm aodmerge_input.txt; ' AOD_merge_task['cmd'] += ' for i in `seq 1 ' + str(NTIMEFRAMES) + '`; do echo "tf${i}/AO2D.root" >> aodmerge_input.txt; done; ' AOD_merge_task['cmd'] += ' o2-aod-merger --input aodmerge_input.txt --output AO2D.root' -workflow['stages'].append(AOD_merge_task) job_merging = False if includeFullQC: - workflow['stages'].extend(include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag)) + add_predefined_tasks(stages, include_all_QC_finalization(ntimeframes=NTIMEFRAMES, standalone=False, run=args.run, productionTag=args.productionTag)) if includeAnalysis: # include analyses and potentially final QC upload tasks - add_analysis_tasks(workflow["stages"], needs=[AOD_merge_task["name"]], is_mc=True) + analysis_tasks = [] + add_analysis_tasks(analysis_tasks, needs=[AOD_merge_task["name"]], is_mc=True) if QUALITYCONTROL_ROOT: - add_analysis_qc_upload_tasks(workflow["stages"], args.productionTag, args.run, "passMC") + add_analysis_qc_upload_tasks(analysis_tasks, args.productionTag, args.run, "passMC") + add_predefined_tasks(stages, analysis_tasks) -dump_workflow(workflow["stages"], args.o) +dump_workflow(stages, args.o) exit (0)