forked from CMS-HGCAL/ntuple-tools
-
Notifications
You must be signed in to change notification settings - Fork 3
/
runHarvesting.py
206 lines (179 loc) · 7.69 KB
/
runHarvesting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import multiprocessing
import sys
import python.file_manager as fm
import traceback
import ROOT
import os
import subprocess32
from shutil import copyfile
import optparse
import logging
import time
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.DEBUG)
sentinel = -1
def data_creator(input_dir, sample_name, version, q):
"""
Creates data to be consumed and waits for the consumer
to finish processing
"""
logger.info('Creating data and putting it on the queue')
ncopied = 0
while True:
data = fm.listFiles(input_dir)
for id, item in enumerate(data):
file_name = os.path.split(item)[1]
# print file_name
# print id
if sample_name in item and version+'_' in item:
# or not os.path.isfile('{}.checked'.format(os.path.splitext(file)[0])):
if os.path.isfile(file_name):
if not os.path.isfile('{}.checked'.format(os.path.splitext(file_name)[0])):
# logger.debug ('file {} exists but check failed...'.format(file_name))
remote_checksum = fm.get_checksum(item)
local_checksum = fm.get_checksum(file_name)
if remote_checksum == local_checksum:
logger.debug(' remote checksum for file: {} did not change...skipping for now'.format(file_name))
continue
else:
logger.info(' remote checksum for file: {} changed: will copy it again'.format(file_name))
else:
continue
copy_ret = fm.copy_from_eos(input_dir=input_dir,
file_name=file_name,
target_file_name=file_name,
dowait=True,
silent=True)
logger.debug('copy returned: {}'.format(copy_ret))
if copy_ret == 0:
q.put(file_name)
ncopied += 1
if ncopied > 999:
q.put(sentinel)
break
if ncopied > 999:
break
time.sleep(20)
def data_checker(queue_all, queue_ready):
"""
Consumes some data and works on it
"""
logger.info('Checking files and putting it on the queue "queue_ready"')
while True:
data = queue_all.get()
if data is sentinel:
queue_ready.put(sentinel)
break
# print('data found to be processed: {}'.format(data))
file = ROOT.TFile(os.path.join(fm.get_eos_protocol(data), data))
if len(file.GetListOfKeys()) == 0:
logger.info('file: {} is not OK'.format(data))
else:
fname = '{}.checked'.format(os.path.splitext(data)[0])
open(fname, 'a').close()
if not os.path.isfile('{}.hadded'.format(os.path.splitext(data)[0])):
queue_ready.put(data)
else:
logger.debug('file: {} has already been hadded...skipping it'.format(data))
file.Close()
def data_consumer(sample_name, version, queue_ready, queue_tomove):
logger.info('Starting data consumer')
out_file_name = '{}_temp.root'.format(sample_name, version)
new_data = []
index = 0
while True:
data = queue_ready.get()
if data is sentinel:
queue_tomove.put(sentinel)
break
new_data.append(data)
if(len(new_data) >= 20):
logger.info('Launch hadd on {} files: '.format(len(new_data)))
hadd_proc = subprocess32.Popen(['hadd', '-a', '-j', '2', '-k', out_file_name]+new_data, stdout=subprocess32.PIPE, stderr=subprocess32.STDOUT)
hadd_proc.wait()
if hadd_proc.returncode == 0:
logger.info(' hadd succeeded with exit code: {}'.format(hadd_proc.returncode))
logger.debug(' hadd output follows: {}'.format(hadd_proc.stdout.readlines()))
index += 1
for file in new_data:
fname = '{}.hadded'.format(os.path.splitext(file)[0])
open(fname, 'a').close()
out_file_name_copy = '{}_tocopy_{}.root'.format(sample_name, index)
copyfile(out_file_name, out_file_name_copy)
queue_tomove.put(out_file_name_copy)
del new_data[:]
logger.debug(' resetting file list for hadd operation to {}'.format(len(new_data)))
else:
logger.info(' hadd failed with exit code: {}'.format(hadd_proc.returncode))
logger.debug(' hadd output follows: {}'.format(hadd_proc.stdout.readlines()))
file = ROOT.TFile(out_file_name)
if len(file.GetListOfKeys()) == 0:
logger.info('file: {} is not OK'.format(out_file_name))
else:
logger.info('file: {} is OK, will retry hadding!'.format(out_file_name))
file.Close()
def data_mover(sample_name, version, out_dir, queue_tomove):
logger.info('Starting data mover')
while True:
data = queue_tomove.get()
if data is sentinel:
break
out_file_name = '{}t.root'.format(sample_name)
fm.copy_to_eos(data, out_dir, out_file_name)
def main():
usage = ('usage: %prog [options]\n'
+ '%prog -h for help')
parser = optparse.OptionParser(usage)
parser.add_option('-i', '--input-dir',
dest='INPUTDIR',
help='input directory (can be an EOS path)')
parser.add_option('-s', '--sample-name',
dest='FILENAMEBASE',
help='name of the sample file base (part of the file-name)')
parser.add_option('-v', '--version',
dest='VERSION',
help='version of the processing (part of the filename)')
parser.add_option('-o', '--output-dir',
dest='OUTPUTDIR',
help='output directory (can be an EOS path)')
global opt, args
(opt, args) = parser.parse_args()
input_dir = opt.INPUTDIR
version = opt.VERSION
sample_name = opt.FILENAMEBASE
out_dir = opt.OUTPUTDIR
logger.info('Starting...')
q = multiprocessing.Queue()
queue_ready = multiprocessing.Queue()
queue_tomove = multiprocessing.Queue()
# r1 = pool.apply_async(func=data_creator, args=(input_dir, sample_name, version, q))
# r2 = pool.apply_async(func=data_checker, args=(q, queue_ready))
processes = []
processes.append(multiprocessing.Process(target=data_creator,
args=(input_dir, sample_name, version, q)))
processes.append(multiprocessing.Process(target=data_checker,
args=(q, queue_ready)))
processes.append(multiprocessing.Process(target=data_consumer,
args=(sample_name, version, queue_ready, queue_tomove)))
processes.append(multiprocessing.Process(target=data_mover,
args=(sample_name, version, out_dir, queue_tomove)))
for proc in processes:
proc.start()
q.close()
q.join_thread()
queue_ready.close()
queue_ready.join_thread()
queue_tomove.close()
queue_tomove.join_thread()
for proc in processes:
proc.join()
return 0
if __name__ == '__main__':
try:
status = main()
sys.exit(status)
except Exception as inst:
print(str(inst))
print("Unexpected error:", sys.exc_info()[0])
traceback.print_exc()
sys.exit(100)