forked from mimbert/g5k_bench_flops
-
Notifications
You must be signed in to change notification settings - Fork 0
/
g5k_prepare_bench_flops
executable file
·131 lines (119 loc) · 6.2 KB
/
g5k_prepare_bench_flops
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
import sys, os
from execo import Remote, Put, Get, SequentialActions, Report
from execo_g5k import get_g5k_clusters, get_cluster_site, OarSubmission, g5k_configuration
from execo_engine import logger, ParamSweeper
from g5k_cluster_engine import g5k_cluster_engine, worker_log
from os.path import join as pjoin
from os.path import exists as pexists
from common import *
class g5k_prepare_bench_flops(g5k_cluster_engine):
def __init__(self):
super(g5k_prepare_bench_flops, self).__init__()
self.options_parser.set_usage("usage: %prog <comma separated list of clusters>")
self.options_parser.set_description("precompile package for benching flops")
self.options_parser.add_option("-o", dest = "oar_options", help = "oar reservation options", default = None)
self.options_parser.add_option("-w", dest = "walltime", help = "walltime of compilation jobs", type = "string", default = "8:0:0")
self.options_parser.add_option("-T", dest = "testing", action = "store_true", default = False,
help = "use api branch testing")
self.options_parser.add_argument("clusters", "comma separated list of clusters. ALL for all clusters")
self.prepare_path = pjoin(self.engine_dir, "preparation")
def init(self):
if len(self.args) != 1:
print "ERROR: missing argument"
self.options_parser.print_help(file=sys.stderr)
exit(1)
if self.options.testing:
g5k_configuration['api_additional_args'].append("branch=testing")
try:
os.makedirs(self.prepare_path)
except:
pass
clusters = set(self.args[0].split(","))
if "ALL" in clusters:
clusters.remove("ALL")
clusters.update(get_g5k_clusters())
clusters_todo = [
cluster
for cluster in clusters
if (not pexists(pjoin(self.prepare_path, prepared_archive("atlas", cluster)))
or not pexists(pjoin(self.prepare_path, prepared_archive("openmpi", cluster)))
or not pexists(pjoin(self.prepare_path, prepared_archive("hpl", cluster)))) ]
self.sweeper = ParamSweeper(pjoin(self.result_dir, "prepare_params"),
clusters_todo,
save_sweeps = True)
logger.info("cluster = %s" % (clusters_todo,))
def get_clusters(self):
return self.sweeper.get_remaining()
def get_job(self, cluster):
comb = self.sweeper.get_next(filtr = lambda r: filter(lambda comb: comb == cluster, r))
if not comb:
return None
submission = OarSubmission(resources = "{cluster='%s'}/nodes=1" % (cluster,),
walltime = self.options.walltime,
name = "flopscompilworker",
job_type = ["allow_classic_ssh"],
additional_options = self.options.oar_options)
return submission, comb
def worker(self, cluster, site, comb, nodes, worker_index, oarsubmission, jobid):
if jobid == None or len(nodes) == 0:
self.sweeper.cancel(comb)
return
def prepare_package(package):
worker_log.info("preparing package %s" % package)
worker_log.info("copy %s files to nodes" % package)
preparation = SequentialActions(
[ Remote("mkdir -p " + node_working_dir,
nodes),
Put(nodes,
local_files = ([ pjoin(self.engine_dir, packages[package]["archive"]),
pjoin(self.engine_dir, "node_prepare_" + package) ] +
[ pjoin(self.prepare_path, prepared_archive(dep, cluster)) for dep in packages[package]["deps"] ]),
remote_location = node_working_dir)])
preparation.run()
if not preparation.ok:
worker_log.info("aborting, copy of %s files failed:\n%s" % (package, Report([preparation]).to_string()))
self.sweeper.cancel(comb)
return
worker_log.info("compile %s" % package)
compil = Remote(
"%s/node_prepare_%s %s/%s %s %s %s > %s/%s.stdout" % (
node_working_dir,
package,
node_working_dir,
packages[package]["archive"],
packages[package]["extract_dir"],
prepared_archive(package, cluster),
" ".join([ prepared_archive(dep, cluster) for dep in packages[package]["deps"] ]),
node_working_dir,
prepared_archive(package, cluster)),
nodes)
compil.run()
if not compil.ok:
worker_log.info("%s compilation failed:\n%s" % (package, Report([compil]).to_string()))
self.sweeper.cancel(comb)
worker_log.info("retrieve result of %s compilation" % package)
remote_files = [ pjoin(node_working_dir, prepared_archive(package, cluster) + ".stdout") ]
if compil.ok:
remote_files.append(pjoin(node_working_dir, prepared_archive(package, cluster)))
retrieval = Get(
nodes,
remote_files = remote_files,
local_location = self.prepare_path)
retrieval.run()
if not retrieval.ok:
try:
os.unlink(pjoin(self.prepare_path, prepared_archive(package, cluster)))
except:
pass
worker_log.info("aborting, retrieval of %s compilation results failed:\n%s" % (package, Report([retrieval]).to_string()))
return
if compil.ok and retrieval.ok:
worker_log.info("finished compilation of package %s" % package)
self.sweeper.done(comb)
for package in ["atlas", "openmpi", "hpl"]:
if not pexists(pjoin(self.prepare_path, prepared_archive(package, cluster))):
prepare_package(package)
if __name__ == "__main__":
e = g5k_prepare_bench_flops()
e.start()