-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathscanner.py
360 lines (308 loc) · 12.9 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
print('Please stand-by submission is initalized, this may take a moment.\n')
import json
import logging
import os
import subprocess
import itertools
import sys
import inspect
import tempfile
import time
from collections import OrderedDict
import numpy as np
import straxen
#SBATCH --job-name=scanner_{name}_{config}
# Previous issue was that the job-name was too long.
# There's some quota on how many characters or something a job-name can be.
# mem-per-cpu argument doesn't work on dali not sure why. Too many computers requested I believe.
JOB_HEADER = """#!/bin/bash
#SBATCH --job-name=scan_{name}
#SBATCH --ntasks=1
#SBATCH --cpus-per-task={n_cpu}
#SBATCH --time={max_hours}:00:00
#SBATCH --partition={partition}
#SBATCH --account=pi-lgrandi
#SBATCH --mem-per-cpu={mem-per-cpu}
#SBATCH --qos={partition}
#SBATCH --output={log_fn}
#SBATCH --error={log_fn}
{extra_header}
# Conda
. "{conda_dir}/etc/profile.d/conda.sh"
{conda_dir}/bin/conda activate {env_name}
echo Starting scanner
python {python_file} {run_id} {target} {data_path} {config_file} {xenon1t}
"""
def scan_parameters(target,
parameter,
name,
register=None,
output_directory='./strax_data',
job_config=None,
log_directory='./parameter_scan',
xenon1t=False,
**kwargs):
"""Called in mystuff.py to run specified jobs.
This main function goes through and runs jobs based on the options
given in strax. Currently this is constructed to call
submit_setting, which then eventually starts a job and proceeds to
get a dataframe of event_info, thereby processing all data for
specified runs.
Params:
strax_options: a dictionary which must include:
- 'run_id' (default is '180215_1029')
- 'config' (strax configuration options)
- A directory to save the job logs in. Default is current
directory in a new file.
- kwargs: max_hours, extra_header, n_cpu, ram (typical dali job
sumission arguments apply.)
"""
# List of todos, or things we could change as well:
#TODO: this function does not support a change of the context. Is this needed?
assert 'run_id' in parameter.keys(), 'No run_id key found in parameters.'
config_list = make_config(parameter)
print('WARNING: Please make sure that the subsquent settings do not change\n',
'the byte size of data_types when using different parameter settings.\n',
'E.g. they do not change the data length of records or equivalent.\n',
'This may break your cached numba functions!\n'
)
# I guess it will happen from time to time that somebody messes up....
# So let us people explicitly confirm their submission before they start
# hundrets of jobs....
_user_check()
# Displaying the job-settings: as well:
default_job_config = {'job_name': name,
'n_cpu': 2,
'max_hours': 1,
'mem-per-cpu': 8000,
'partition': 'dali',
'conda_dir': '/dali/lgrandi/strax/miniconda3',
'env_name': 'strax',
'extra_header': ''
}
# Update default settings:
if job_config:
for key, value in job_config.items():
if key in default_job_config.keys():
default_job_config[key] = value
else:
raise ValueError(f'Job settings {key} is not supported. If you\n'
'want to add some SBATCH option please use the\n'
'"extra_sbatch_header" key.')
# Lets print the settings and ask the user again:
print('\nYou specified the following settings for the batch jobs:')
for key, value in default_job_config.items():
print(f'{key}: {value}')
_user_check()
# Now we have to make sure that the batch node will know where to
# find our newly registered plugins.
if register:
if not isinstance(register, (list, tuple)):
register = [register]
# Creating a dictionary with all relevant information about the
# plugin.
reg = []
for p in register:
directory = os.path.dirname(inspect.getfile(p))
reg.append({'mod_path': directory,
'mod': p.__module__,
'p_name': p.__name__})
register=reg
# Check that directory exists, else make it
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Submit each of these, such that it calls me (parameter_scan.py) with submit_setting option
for i, config in enumerate(config_list):
print('Submitting %d with %s' % (i, config))
# Now we add this to our config and later to the json file.
config['register'] = register
submit_setting(i,
config.pop('run_id'),
target,
name,
config,
output_directory,
default_job_config,
log_directory,
xenon1t,
**kwargs
)
pass
def _user_check():
not_correct_answer = True
answer = input('Are you sure you would like to submit these settings? (y/n)')
while not_correct_answer:
if answer=='y':
print('Proceeding with the submission.')
not_correct_answer = False
elif answer=='n':
print('Abord submission.')
raise SystemExit(0)
else:
answer = input(f'{answer} was not a valid input please use (y/n) for yes/no.')
def make_config(parameters_dict, be_quiet = False):
# Converting any dict to ordered dict, might be a bit more user
# friendly.
parameters = OrderedDict()
for key, values in parameters_dict.items():
print(key, values)
parameters[key] = values
keys = list(parameters.keys())
values = list(parameters.values())
#Make a meshgrid of all the possible parameters we have, for scanning purposes.
combination_values = list(itertools.product(*(parameters_dict[key] for key in keys)))
strax_options = []
#Enumerate over all possible options to create a strax_options list for scanning later.
for i, value in enumerate(combination_values):
if not be_quiet:
print('Setting %d:' % i)
config = {}
for j, parameter in enumerate(value):
print('\t', keys[j], parameter)
config[keys[j]] = parameter
strax_options.append(config)
return strax_options
def submit_setting(setting_number,
run_id,
target,
name,
config,
output_directory,
job_config,
log_directory,
xenon1t,
**kwargs):
"""
Submits a job with a certain setting.
Given input setting arguments, submit_setting will take a current
run and submit a job to dali that calles this code itself (so not
runs scanner.py directly.) First, files are created temporarily in
directory (arg) that document what you have submitted. These are
then used to submit a typical job to dali. This then bumps us down
to if __name__ == '__main__' since we call it directly. You could
alternatively switch this so that if we submit_setting we run a
different file.
Arguments:
run_id (str): run id of the run to submit the job for. Currently not
tested if multiple run id's work but they should.
config (dict): a dictionary of configuration settings to try (see
strax_options in mystuff.py for construction of these config
settings)
directory (str): Place where the job logs will get saved. These are
short files just with the parameters saved and log of what
happened, but can be useful in debugging.
**kwargs (optional): can include the following
- n_cpu : numbers of cpu for this job. Default 40. Decreasing
this to minimum needed will get your job to submit to dali
faster!
- max_hours: max hours to run job. Default 8
- name: the appendix you want to scan_{name} which shows up in
dali. Defaults "magician" if not specified so change if
you're not a wizard.
- mem_per_cpu: (MB) default is 4480 MB for RAM per CPU. May need
more to run big processing.
- partition: Default to dali
- conda_dir: Where is your environment stored? Default is
/dali/lgrandi/strax/miniconda3 (for backup strax env).
Change?
- env_name: Which conda env do you want, defaults to strax
inside conda_dir
"""
file_fn = log_directory + '/setting_' + str(setting_number) + "_" + tempfile.NamedTemporaryFile(delete=True, dir=log_directory).name[-5:] #so if you submit many identical jobs you don't write over old ones
job_fn = file_fn + '_job'
log_fn = file_fn + '_log'
config_fn = file_fn + '_conf'
# Lets add the job_config here, so we can later read it in again:
config['job_config'] = job_config
config['setting_number'] = setting_number
#Takes configuration parameters and dumps the stringed version into a file called config_fn
with open(config_fn, mode='w') as f:
json.dump(config, f)
with open(job_fn, mode='w') as f:
# Rename such that not just calling header, I think this is done now, no?
# TODO PEP8
f.write(JOB_HEADER.format(
**job_config,
log_fn=log_fn,
python_file=os.path.abspath(__file__),
config_file = config_fn,
name=name,
run_id=run_id,
target=target,
data_path=output_directory,
xenon1t=xenon1t
))
print("\tSubmitting sbatch %s" % job_fn)
result = subprocess.check_output(['sbatch', job_fn])
job_id = int(result.decode().split()[-1])
print("\tYou have job id %d" % job_id)
def work(run_id,
target,
config,
job_config,
output_folder='./strax_data',
register=None,
xenon1t=False,
**kwargs):
if register:
# First we have to in case there are any plugins to register:
if not isinstance(register, (list, tuple)):
register = [register]
reg = []
for p in register:
if not 'straxen.plugins.' in p['mod_path']:
# Not part of straxen so add path:
sys.path.append(p['mod_path'])
# Now get plugin:
mod = __import__(p['mod'], fromlist=p['p_name'])
p = getattr(mod, p['p_name'])
reg.append(p)
register = reg
if xenon1t:
st = straxen.contexts.xenon1t_dali(
output_folder=output_folder,
)
else:
st = straxen.contexts.xenonnt_online(
output_folder=output_folder,
)
if register is not None:
st.register(register)
st.set_config(config)
st.make(run_id, target, max_workers=job_config['n_cpu'], **kwargs)
if __name__ == "__main__": #happens if submit_setting() is called
if len(sys.argv) == 1: # argv[0] is the filename
print('hi I am ', __file__)
scan_parameters()
elif len(sys.argv) == 6:
run_id = sys.argv[1]
target = sys.argv[2]
data_path = sys.argv[3]
config_fn = sys.argv[4]
xenon1t = sys.argv[5]
print(run_id, data_path, config_fn)
print("Things are changing")
# Reread the config file to grab the config parameters
with open(config_fn, mode='r') as f:
config = json.load(f)
# Now we have to seperate off all
# non-strax configs:
register=config.pop('register')
job_config=config.pop('job_config')
time0 = time.perf_counter()
# Add random sleep to avoid numba cache errors...
t = np.random.uniform(0,1)
time.sleep(1+t)
work(run_id=run_id,
target=target,
register=register,
output_folder=data_path,
config=config,
job_config=job_config,
xenon1t=eval(xenon1t)
)
time1 = time.perf_counter()
print('Job took: %.2fs'%(time1-time0))
# TODO: Clean up everything except for the log file?
else:
raise ValueError("Bad command line arguments")