-
Notifications
You must be signed in to change notification settings - Fork 2
/
g5k_prepare_bench_flops
executable file
·143 lines (131 loc) · 6.92 KB
/
g5k_prepare_bench_flops
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
import sys, os
from execo import Remote, Put, Get, SequentialActions, Report
from execo_g5k import get_g5k_clusters, get_cluster_site, OarSubmission, g5k_configuration, deploy, Deployment
from execo_engine import logger, ParamSweeper
from g5k_cluster_engine import g5k_cluster_engine, worker_log
from os.path import join as pjoin
from os.path import exists as pexists
from common import *
class g5k_prepare_bench_flops(g5k_cluster_engine):
def __init__(self):
super(g5k_prepare_bench_flops, self).__init__()
self.args_parser.usage = "usage: %(prog)s <comma separated list of clusters>"
self.args_parser.description = "precompile package for benching flops"
self.args_parser.add_argument("-o", dest = "oar_options", help = "oar reservation options", default = None)
self.args_parser.add_argument("-w", dest = "walltime", help = "walltime of compilation jobs", type = str, default = "8:0:0")
self.args_parser.add_argument("-D", dest = "deploy_env", help = "environment used to compile packages (DEFAULT=\"\" No deployment)", type = str, default = "")
self.args_parser.add_argument("-T", dest = "testing", action = "store_true", default = False,
help = "use api branch testing")
self.args_parser.add_argument("clusters", help = "comma separated list of clusters. ALL for all clusters")
self.prepare_path = pjoin(self.engine_dir, "preparation")
def init(self):
if not vars(self.args):
print("ERROR: missing argument")
self.args_parser.print_help(file=sys.stderr)
exit(1)
if self.args.testing:
g5k_configuration['api_additional_args']['branch'] = "testing"
try:
os.makedirs(self.prepare_path)
except:
pass
clusters = set(self.args.clusters.split(","))
if "ALL" in clusters:
clusters.remove("ALL")
clusters.update(get_g5k_clusters())
clusters_todo = [
cluster
for cluster in clusters
if (not pexists(pjoin(self.prepare_path, prepared_archive("atlas", cluster)))
or not pexists(pjoin(self.prepare_path, prepared_archive("openmpi", cluster)))
or not pexists(pjoin(self.prepare_path, prepared_archive("hpl", cluster)))) ]
self.sweeper = ParamSweeper(pjoin(self.result_dir, "prepare_params"),
clusters_todo,
save_sweeps = True)
logger.info("cluster = %s" % (clusters_todo,))
def get_clusters(self):
return self.sweeper.get_remaining()
def get_job(self, cluster):
comb = self.sweeper.get_next(filtr = lambda r: filter(lambda comb: comb == cluster, r))
if not comb:
return None
if self.args.deploy_env == "":
job_type = ["allow_classic_ssh"]
else:
job_type = ["deploy"]
submission = OarSubmission(resources = "{cluster='%s'}/nodes=1" % (cluster,),
walltime = self.args.walltime,
name = "flopscompilworker",
job_type = job_type,
additional_options = self.args.oar_options)
return submission, comb
def worker(self, cluster, site, comb, nodes, worker_index, oarsubmission, jobid):
if jobid == None or len(nodes) == 0:
self.sweeper.cancel(comb)
return
def prepare_package(package):
if not self.args.deploy_env == "":
worker_log.info("deploy compilation environment %s" % self.args.deploy_env)
deployed, undeployed = deploy(Deployment(nodes, env_name = self.args.deploy_env))
worker_log.info("%i deployed, %i undeployed" % (len(deployed), len(undeployed)))
if len(deployed) < 1:
worker_log.error("deployment failed")
return
worker_log.info("preparing package %s" % package)
worker_log.info("copy %s files to nodes" % package)
preparation = SequentialActions(
[ Remote("mkdir -p " + node_working_dir,
nodes),
Put(nodes,
local_files = ([ pjoin(self.engine_dir, packages[package]["archive"]),
pjoin(self.engine_dir, "node_prepare_" + package) ] +
[ pjoin(self.prepare_path, prepared_archive(dep, cluster)) for dep in packages[package]["deps"] ]),
remote_location = node_working_dir)])
preparation.run()
if not preparation.ok:
worker_log.info("aborting, copy of %s files failed:\n%s" % (package, Report([preparation]).to_string()))
self.sweeper.cancel(comb)
return
worker_log.info("compile %s" % package)
compil = Remote(
"%s/node_prepare_%s %s/%s %s %s %s > %s/%s.stdout" % (
node_working_dir,
package,
node_working_dir,
packages[package]["archive"],
packages[package]["extract_dir"],
prepared_archive(package, cluster),
" ".join([ prepared_archive(dep, cluster) for dep in packages[package]["deps"] ]),
node_working_dir,
prepared_archive(package, cluster)),
nodes)
compil.run()
if not compil.ok:
worker_log.info("%s compilation failed:\n%s" % (package, Report([compil]).to_string()))
self.sweeper.cancel(comb)
worker_log.info("retrieve result of %s compilation" % package)
remote_files = [ pjoin(node_working_dir, prepared_archive(package, cluster) + ".stdout") ]
if compil.ok:
remote_files.append(pjoin(node_working_dir, prepared_archive(package, cluster)))
retrieval = Get(
nodes,
remote_files = remote_files,
local_location = self.prepare_path)
retrieval.run()
if not retrieval.ok:
try:
os.unlink(pjoin(self.prepare_path, prepared_archive(package, cluster)))
except:
pass
worker_log.info("aborting, retrieval of %s compilation results failed:\n%s" % (package, Report([retrieval]).to_string()))
return
if compil.ok and retrieval.ok:
worker_log.info("finished compilation of package %s" % package)
self.sweeper.done(comb)
for package in ["atlas", "openmpi", "hpl"]:
if not pexists(pjoin(self.prepare_path, prepared_archive(package, cluster))):
prepare_package(package)
if __name__ == "__main__":
e = g5k_prepare_bench_flops()
e.start()