-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrun_config.py
663 lines (523 loc) · 21.5 KB
/
run_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
import importlib
from pathlib import Path
import sys
import time
import os
import glob
import traceback
import socket
import numpy as np
import pandas as pd
import yaml
import evaluation.loader as dl
from builtins import Exception
import pickle
import dill
from telegram.ext.updater import Updater
from telegram.ext.commandhandler import CommandHandler
import random
import gc
# #telegram notificaitons
# CHAT_ID = -1
# BOT_TOKEN = ''
# updater = Updater(BOT_TOKEN)
# updater.start_polling()
NOTIFY = False
def main( conf, out=None ):
'''
Execute experiments for the given configuration path
--------
conf: string
Configuration path. Can be a single file or a folder.
out: string
Output folder path for endless run listening for new configurations.
'''
print( 'Checking {}'.format( conf ) )
# updater.dispatcher.add_handler( CommandHandler('status', status) )
file = Path( conf )
if file.is_file():
print( 'Loading file' )
send_message( 'processing config ' + conf )
stream = open( str(file) )
c = yaml.load(stream)
stream.close()
try:
run_file( c )
send_message( 'finished config ' + conf )
except (KeyboardInterrupt, SystemExit):
send_message( 'manually aborted config ' + list[0] )
os.rename( list[0] , out + '/' + file.name + str(time.time()) + '.cancled' )
raise
except Exception:
print( 'error for config ', list[0] )
os.rename( list[0] , out + '/' + file.name + str(time.time()) + '.error' )
send_exception( 'error for config ' + list[0] )
traceback.print_exc()
exit()
if file.is_dir():
if out is not None:
ensure_dir( out + '/out.txt' )
send_message( 'waiting for configuration files in ' + conf )
while True:
print( 'waiting for configuration files in ', conf )
list = glob.glob(conf + '/' + '*.yml')
if len( list ) > 0:
try:
file = Path( list[0] )
print( 'processing config', list[0] )
send_message( 'processing config ' + list[0] )
stream = open(str(file))
c = yaml.load(stream)
stream.close()
run_file( c )
print( 'finished config', list[0] )
send_message( 'finished config ' + list[0] )
os.rename( list[0] , out + '/' + file.name + str(time.time()) + '.done' )
except (KeyboardInterrupt, SystemExit):
send_message( 'manually aborted config ' + list[0] )
os.rename( list[0] , out + '/' + file.name + str(time.time()) + '.cancled' )
raise
except Exception:
print( 'error for config ', list[0] )
os.rename( list[0] , out + '/' + file.name + str(time.time()) + '.error' )
send_exception( 'error for config ' + list[0] )
traceback.print_exc()
time.sleep(5)
else:
print( 'processing folder ', conf )
list = glob.glob(conf + '/' + '*.yml')
for conf in list:
try:
print( 'processing config', conf )
send_message( 'processing config ' + conf )
stream = open( str( Path( conf ) ) )
c = yaml.load(stream)
stream.close()
run_file( c )
print( 'finished config', conf )
send_message( 'finished config ' + conf )
except (KeyboardInterrupt, SystemExit):
send_message( 'manually aborted config ' + conf )
raise
except Exception:
print( 'error for config ', conf )
send_exception( 'error for config' + conf )
traceback.print_exc()
exit()
def run_file( conf ):
'''
Execute experiments for one single configuration file
--------
conf: dict
Configuration dictionary
'''
if conf['type'] == 'single':
run_single( conf )
elif conf['type'] == 'window':
run_window( conf )
elif conf['type'] == 'opt':
run_opt( conf )
else:
print( conf['type'] + ' not supported' )
def run_single( conf, slice = None ):
'''
Evaluate the algorithms for a single split
--------
conf: dict
Configuration dictionary
slice: int
Optional index for the window slice
'''
print( 'run test single' )
algorithms = create_algorithms_dict( conf['algorithms'] )
metrics = create_metric_list( conf['metrics'] )
evaluation = load_evaluation( conf['evaluation'] )
if 'opts' in conf['data']:
train, test = dl.load_data_session( conf['data']['folder'], conf['data']['prefix'], slice_num=slice, **conf['data']['opts'] )
else:
train, test = dl.load_data_session(conf['data']['folder'], conf['data']['prefix'], slice_num=slice)
buys=pd.DataFrame()
if 'buys' in conf['data'] and 'file_buys' in conf['data']:
buys = dl.load_buys(conf['data']['folder'], conf['data']['file_buys']) # load buy actions in addition
for m in metrics:
m.init( train )
if hasattr(m, 'set_buys'):
m.set_buys(buys,test)
results = {}
for k, a in algorithms.items():
eval_algorithm(train, test, k, a, evaluation, metrics, results, conf, slice=slice, iteration=slice)
print_results( results )
write_results_csv( results, conf, iteration=slice )
def run_opt_single( conf, iteration, globals ):
'''
Evaluate the algorithms for a single split
--------
conf: dict
Configuration dictionary
slice: int
Optional index for the window slice
'''
print( 'run test opt single' )
algorithms = create_algorithms_dict( conf['algorithms'] )
for k, a in algorithms.items():
aclass = type(a)
if not aclass in globals:
globals[aclass] = { 'key': '', 'best': -1 }
metrics = create_metric_list( conf['metrics'] )
metric_opt = create_metric( conf['optimize'] )
metrics = metric_opt + metrics
evaluation = load_evaluation( conf['evaluation'] )
train_eval = True
if 'train_eval' in conf['data']:
train_eval = conf['data']['train_eval']
if 'opts' in conf['data']:
train, test = dl.load_data_session( conf['data']['folder'], conf['data']['prefix'], train_eval=train_eval, **conf['data']['opts'] )
else:
train, test = dl.load_data_session( conf['data']['folder'], conf['data']['prefix'], train_eval=train_eval )
for m in metrics:
m.init( train )
results = {}
for k, a in algorithms.items():
eval_algorithm(train, test, k, a, evaluation, metrics, results, conf, iteration=iteration, out=False)
write_results_csv( results, conf, iteration=iteration )
for k, a in algorithms.items():
aclass = type(a)
current_value = results[k][0][1]
if globals[aclass]['best'] < current_value:
print( 'found new best configuration' )
print( k )
print( 'improvement from {} to {}'.format(globals[aclass]['best'], current_value) )
send_message( 'improvement for {} from {} to {} in test {}'.format(k, globals[aclass]['best'], current_value, iteration ) )
globals[aclass]['best'] = current_value
globals[aclass]['key'] = k
globals['results'].append( results )
del algorithms
del metrics
del evaluation
del results
gc.collect()
def run_window( conf ):
'''
Evaluate the algorithms for all slices
--------
conf: dict
Configuration dictionary
'''
print( 'run test window' )
slices = conf['data']['slices']
slices = list( range( slices ) )
if 'skip' in conf['data']:
for i in conf['data']['skip']:
slices.remove(i)
for i in slices:
print( 'start run for slice ', str(i) )
send_message( 'start run for slice ' + str(i) )
run_single( conf, slice = i )
def run_opt( conf ):
'''
Perform an optmization for the algorithms
--------
conf: dict
Configuration dictionary
'''
iterations = conf['optimize']['iterations'] if 'optimize' in conf and 'iterations' in conf['optimize'] else 100
start = conf['optimize']['iterations_skip'] if 'optimize' in conf and 'iterations_skip' in conf['optimize'] else 0
print( 'run opt with {} iterations starting at {}'.format( iterations, start ) )
globals = {}
globals['results'] = []
for i in range(start, iterations):
print( 'start random test ', str(i) )
run_opt_single( conf, i, globals )
global_results = {}
for results in globals['results']:
for key, value in results.items():
global_results[key] = value
write_results_csv( global_results, conf )
def eval_algorithm( train, test, key, algorithm, eval, metrics, results, conf, slice=None, iteration=None, out=True ):
'''
Evaluate one single algorithm
--------
train : Dataframe
Training data
test: Dataframe
Test set
key: string
The automatically created key string for the algorithm
algorithm: algorithm object
Just the algorithm object, e.g., ContextKNN
eval: module
The module for evaluation, e.g., evaluation.evaluation_last
metrics: list of Metric
Optional string to add to the file name
results: dict
Result dictionary
conf: dict
Configuration dictionary
slice: int
Optional index for the window slice
'''
ts = time.time()
print( 'fit ', key )
#send_message( 'training algorithm ' + key )
if hasattr(algorithm, 'init'):
algorithm.init(train, test, slice=slice)
for m in metrics:
if hasattr(m, 'start'):
m.start( algorithm )
algorithm.fit(train, test)
print( key, ' time: ', ( time.time() - ts) )
if 'results' in conf and 'pickle_models' in conf['results']:
try:
save_model( key, algorithm, conf )
except Exception:
print( 'could not save model for ' + key )
for m in metrics:
if hasattr(m, 'start'):
m.stop( algorithm )
results[key] = eval.evaluate_sessions( algorithm, metrics, test, train, conf=conf, key=key)
if out:
write_results_csv({key:results[key]}, conf, extra=key, iteration=iteration)
#send_message( 'algorithm ' + key + ' finished ' + ( 'for slice ' + str(slice) if slice is not None else '' ) )
#algorithm.clear()
def write_results_csv( results, conf, iteration=None, extra=None ):
'''
Write the result array to a csv file, if a result folder is defined in the configuration
--------
results : dict
Dictionary of all results res[algorithm_key][metric_key]
iteration; int
Optional for the window mode
extra: string
Optional string to add to the file name
'''
if 'results' in conf and 'folder' in conf['results']:
export_csv = conf['results']['folder'] + 'test_'+conf['type'] + '_' + conf['key'] + '_' + conf['data']['name']
if extra is not None:
export_csv += '.' + str(extra)
if iteration is not None:
export_csv += '.' + str(iteration)
export_csv += '.csv'
ensure_dir(export_csv)
file = open(export_csv, 'w+')
file.write('Metrics;')
for k, l in results.items():
for e in l:
file.write(e[0])
file.write(';')
break
file.write('\n')
for k, l in results.items():
file.write(k)
file.write(';')
for e in l:
file.write(str(e[1]))
file.write(';')
if len(e) > 2:
if type( e[2] ) == pd.DataFrame:
name = export_csv + '.' + e[0].replace(':','-').replace(' ','-') + '.csv'
e[2].to_csv(name, sep=";", index=False)
file.write('\n')
def save_model( key, algorithm, conf ):
'''
Save the model object for reuse with FileModel
--------
algorithm : object
Dictionary of all results res[algorithm_key][metric_key]
conf : object
Configuration dictionary, has to include results.pickel_models
'''
file_name = conf['results']['folder'] + '/' + conf['key'] + '_' + conf['data']['name'] + '_' + key + '.pkl'
file_name = Path( file_name )
ensure_dir(file_name)
file = open( file_name, 'wb' )
#pickle.dump(algorithm, file)
dill.dump(algorithm, file)
file.close()
def print_results( res ):
'''
Print the result array
--------
res : dict
Dictionary of all results res[algorithm_key][metric_key]
'''
for k, l in res.items():
for e in l:
print( k, ':', e[0], ' ', e[1] )
def load_evaluation( module ):
'''
Load the evaluation module
--------
module : string
Just the last part of the path, e.g., evaluation_last
'''
return importlib.import_module( 'evaluation.' + module )
def create_algorithms_dict( list ):
'''
Create algorithm instances from the list of algorithms in the configuration
--------
list : list of dicts
Dicts represent a single algorithm with class, a key, and optionally a param dict
'''
algorithms = {}
for algorithm in list:
Class = load_class( 'algorithms.' + algorithm['class'] )
default_params = algorithm['params'] if 'params' in algorithm else {}
random_params = generate_random_params( algorithm )
params = { **default_params, **random_params }
del default_params, random_params
if 'params' in algorithm:
if 'algorithms' in algorithm['params']:
hybrid_algorithms = create_algorithms_dict(algorithm['params']['algorithms'])
params['algorithms'] = []
a_keys = []
for k, a in hybrid_algorithms.items():
params['algorithms'].append(a)
a_keys.append(k)
# instance = Class( **params )
key = algorithm['key'] if 'key' in algorithm else algorithm['class']
if 'params' in algorithm:
if 'algorithms' in algorithm['params']:
for k, val in params.items():
if k == 'algorithms':
for pKey in a_keys:
key += '-' + pKey
elif k == 'file':
key += ''
else:
key += '-'+str(k)+"="+str(val)
key = key.replace(',', '_')
else:
for k, val in params.items():
if k != 'file':
key += '-'+str(k)+"="+str(val)
key = key.replace(',', '_')
#key += '-' + '-'.join( map( lambda x: str(x[0])+'='+str(x[1]), params.items() ) )
if 'params_var' in algorithm:
for k,var in algorithm['params_var'].items():
for val in var:
params[k] = val # params.update({k: val})
kv = k
for v in val:
kv += '-'+str(v)
instance = Class(**params)
algorithms[key+kv] = instance
else:
instance = Class(**params)
algorithms[ key ] = instance
return algorithms
def generate_random_params( algorithm ):
params = {}
if 'params_opt' in algorithm:
for key, value in algorithm['params_opt'].items():
space = []
if type(value) == list:
for entry in value:
if type(entry) == list:
space += entry
# space.append(entry)
elif type(entry) == dict: # range
space += list(create_linspace(entry))
else:
space += [entry]
# space += entry
chosen = random.choice(space)
elif type(value) == dict: # range
if 'space' in value:
if value['space'] == 'weight':
space.append(create_weightspace(value)) # {from: 0.0, to: 0.9, in: 10, type: float}
elif value['space'] == 'recLen':
space.append(create_linspace(value))
else:
space = create_linspace(value) # {from: 0.0, to: 0.9, in: 10, type: float}
chosen = random.choice(space)
chosen = float(chosen) if 'type' in value and value['type'] == 'float' else chosen
else:
print('not the right type')
params[key] = chosen
return params
def create_weightspace(value):
num = value['num']
space = []
sum = 0
rand = 1
for i in range(num-1): #all weights excluding the last one
while (sum+rand) >= 1:
# rand = np.linspace(0, 1, num=0.05).astype('float32')
rand = round(np.random.rand(), 2)
space.append(rand)
sum += rand
rand = 1
space.append(round(1-sum, 2)) #last weight
return space
def create_linspace( value ):
start = value['from']
end = value['to']
steps = value['in']
space = np.linspace( start, end, num=steps).astype( value['type'] if 'type' in value else 'float32' )
return space
def create_metric_list( list ):
'''
Create metric class instances from the list of metrics in the configuration
--------
list : list of dicts
Dicts represent a single metric with class and optionally the list length
'''
metrics = []
for metric in list:
metrics += create_metric( metric )
return metrics
def create_metric( metric ):
metrics = []
Class = load_class( 'evaluation.metrics.' + metric['class'] )
if 'length' in metric:
for list_length in metric['length']:
metrics.append( Class(list_length) )
else:
metrics.append( Class() )
return metrics
def load_class( path ):
'''
Load a class from the path in the configuration
--------
path : dict of dicts
Path to the class, e.g., algorithms.knn.cknn.ContextKNNN
'''
module_name, class_name = path.rsplit('.', 1)
Class = getattr( importlib.import_module(module_name), class_name )
return Class
def ensure_dir(file_path):
'''
Create all directories in the file_path if non-existent.
--------
file_path : string
Path to the a file
'''
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
def send_message( text ):
if NOTIFY:
body = 'News from ' + socket.gethostname() + ': \n'
body += text
updater.bot.send_message( chat_id=CHAT_ID, text=body )
def send_exception( text ):
if NOTIFY:
send_message( text )
tmpfile = open( 'exception.txt', 'w')
traceback.print_exc(file=tmpfile)
tmpfile.close()
send_file('exception.txt' )
def send_file( file ):
if NOTIFY:
file = open( file, 'rb' )
updater.bot.send_document( chat_id=CHAT_ID, document=file )
file.close()
def status(bot, update):
if NOTIFY:
update.message.reply_text(
'Running on {}'.format( socket.gethostname() ))
if __name__ == '__main__':
if len( sys.argv ) > 1:
main( sys.argv[1], out=sys.argv[2] if len( sys.argv ) > 2 else None )
else:
print( 'File or folder expected.' )