forked from markmont/flux-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lsa_flux_check
executable file
·511 lines (433 loc) · 20.3 KB
/
lsa_flux_check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
#!/usr/bin/python
#
# lsa_flux_check - determine if a user of lsa_flux, lsa_fluxm, or lsa_fluxg
# is complying with the usage limits as documented at
# http://arc.research.umich.edu/flux-and-other-hpc-resources/flux/using-flux/lsas-public-flux-allocation/
#
# Note: this script ONLY checks the usage limits that are not automatically
# enforced by Moab.
#
# Currently, the checks are:
# * lsa_flux:
# (no checks currently done -- everything is enforced automatically by Moab)
# * lsa_fluxm:
# - Make sure walltime for each job is <= 1 week.
# - Make sure each job is not capable of being run under lsa_flux:
# cores > 24 or cores per node > 20 or mem > 96 GB or mem per node > 80 GB
# (note that the 20 core Standard Flux nodes have 96 GB RAM, not 80. But
# we loosen the check for lsa_fluxm to be 80 so users don't need to
# request more than 4 GB / core when using lsa_flux unless they choose
# to do so).
# * lsa_fluxg:
# - Make sure walltime for each job is <= 3 days.
# - Make sure each job requested exactly 1 GPU (not 0, not 2)
# * all allocations:
# - Make sure that there is not another allocation that the user has
# access to that the job could be running under.
#
INSTALL_ROOT = '/home/software/rhel6/lsa/flux-utils';
import site
site.addsitedir( "%s/lib/python2.6/site-packages/" % INSTALL_ROOT )
import sys
import os
import re
import string
import time
import pwd
import subprocess
import argparse
import cStringIO
import ldap
import daemon
import inspect
from collections import defaultdict
from torque import *
sys.path.append( INSTALL_ROOT + '/etc' )
from lsa_flux_check_config import *
# scheduled check times (hours of the day):
scheduled_hours = [ 8, 12, 16, 22 ]
# account name and core counts
accounts = [ 'lsa_flux', 'lsa_fluxm', 'lsa_fluxg' ]
account_cores = { 'lsa_flux': 120, 'lsa_fluxm': 40, 'lsa_fluxg': 4 }
account_mem = { 'lsa_flux': '480 GB', 'lsa_fluxm': '1024 GB', 'lsa_fluxg': '16 GB' }
# "processor equivalents" for each allocation, in KB of memory:
account_pe = { 'lsa_flux': 4194304, 'lsa_fluxm': 26214400, 'lsa_fluxg': 4194304 }
# Maximum per-job wallclock time for each allocation.
# Note that the lsa_flux limit won't actually affect anything since Moab is
# configured with a maximum wallclock time of 28 days.
account_wclimit = { 'lsa_flux': 30*86400, 'lsa_fluxm': 7*86400, 'lsa_fluxg': 3*86400 }
# conversion factors for ss:mm:hh:d (reverse of d:hh:mm:ss walltime specification)
walltime_conv = [ 1, 60, 3600, 86400 ]
# where to send reports; notificatons to users are also cc:'d to this address
admin_email = '[email protected]'
user_accounts = defaultdict( list ) # user_accounts['bjensen'] = [ 'lsa_flux', 'stats_flux', 'FluxTraining_flux' ]
account_info = defaultdict( dict ) # account_info['stats_flux'] = { 'maxproc': 30, 'maxmem': 128000000 } maxmem is in KB
current_user = pwd.getpwuid( os.geteuid() ).pw_name
# Use libc's atoi() function per
# http://stackoverflow.com/questions/1665511/python-equivalent-to-atoi-atof
# ...because using regular expressions followed by int() seems overkill,
# and there are several points where we need to get integers that are at
# the beginning of strings.
import ctypes.util
libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
def flux_support_staff( user ):
match = "uid=%s,ou=People,dc=umich,dc=edu" % user
try:
directory = ldap.initialize( "ldap://ldap.umich.edu/" )
results = directory.search_s( "ou=User Groups,ou=Groups,dc=umich,dc=edu", ldap.SCOPE_SUBTREE, "(cn=flux-support-staff)", [ "member" ] )
for dn, entry in results:
if 'member' in entry:
members = entry['member']
if match in members:
return True
except ldap.LDAPError, e:
print "Error talking to LDAP server:"
print e
return False
# Capture the script's stdout into a variable.
# Based on https://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call
class Capturing( list ):
def __enter__( self ):
self._stdout = sys.stdout
sys.stdout = self._stringio = cStringIO.StringIO()
return self
def __str__( self ):
return self._stringio.getvalue()
def __exit__( self, *args ):
sys.stdout = self._stdout
def get_account_info():
pipe = subprocess.Popen( [ '/opt/moab/bin/mdiag', '-a' ], stdout=subprocess.PIPE )
alloc = 'default_flux' # placeholder for un-used allocation
for line in pipe.stdout:
m = re.search( r'^([a-zA-Z0-9_]+)\s', line )
if m:
alloc = m.group(1)
if alloc == 'default_flux' or alloc in accounts:
continue
if re.search( r'^fluxtraining_flux', alloc, re.I ):
continue
if re.search( r'^hpc\d+_flux', alloc, re.I ):
continue
m = re.search( r'MAXPROC=(\d+)', line )
if m:
maxproc = int( m.group(1) )
account_info[alloc]['maxproc'] = maxproc
m = re.search( r'MAXMEM=(\d+)', line )
if m:
maxmem = int( m.group(1) ) * 1024 # in KB
account_info[alloc]['maxmem'] = maxmem
m = re.search( r'Users:\s+(\S+)', line )
if m:
users = m.group(1)
ignorelist = ignore_accounts.get( alloc, [] )
for u in string.split( users, ',' ):
if not u in ignorelist:
user_accounts[ u ].append( alloc )
pipe.stdout.close()
pipe.wait()
def main( args ):
get_account_info()
warnings = defaultdict( list )
add_fluxm_tip = dict();
flux_usage = defaultdict( dict )
flux_runningjobcount_byuser = defaultdict( dict )
flux_runningcorecount_byuser = defaultdict( dict )
flux_runningmemcount_byuser = defaultdict( dict )
flux_runningpecount_byuser = defaultdict( dict )
flux_runningjobs = defaultdict( list )
flux_cores = dict()
flux_mem = dict()
flux_pe = dict()
flux_cm = dict()
flux_totaljobcount = dict()
for account in accounts:
flux_cores[account] = 0
flux_mem[account] = 0
flux_pe[account] = 0
flux_cm[account] = 0
flux_totaljobcount[account] = 0
pbs = PBS()
pbs.connect( pbs.default() )
sel_list = ATTROPL( None, "job_state", None, "R", BATCH_OP.EQ )
jobs = pbs.selstat( sel_list, None )
for job in jobs:
attrib = job['attrib']
if not 'Account_Name' in attrib:
continue
account = attrib['Account_Name']
if not account in accounts:
continue
flux_totaljobcount[account] += 1
user = attrib['Job_Owner'].split('@')[0]
if not args.all and user != current_user:
continue
cores = -1
r = attrib['Resource_List']
if 'procs_bitmap' in r:
# see http://docs.adaptivecomputing.com/torque/4-2-6/help.htm#topics/2-jobs/requestingRes.htm#procs_bitmap
cores = len( r['procs_bitmap'] )
if 'procs' in r:
c = libc.atoi( r['procs'] )
if c > cores and c > 0: cores = c
maxppn = 0
if 'nodes' in r:
c = 0
for nodespec in r['nodes'].split('+'):
p = 1
n = libc.atoi( nodespec )
if n < 1: n = 1
m = re.search( r'ppn=(\d+)', nodespec )
if m:
p = int( m.group(1) )
if p > maxppn: maxppn = p
if p < 1: p = 1
c += n*p
if c > cores and c > 0: cores = c
if 'exec_host' in attrib:
c = 0
for host in attrib['exec_host'].split('+'):
# we know how to handle host specs of the form nyx5678/3
if not re.search( r'\w+/\d+', host ):
# something we don't know how to interpret, bail
c = 0
break
c += 1
if c > cores and c > 0: cores = c
mem = -1
if 'pmem' in r and cores > 0:
mem = get_memory( r['pmem'] ) * cores;
if 'mem' in r:
m = get_memory( r['mem'] );
if m > mem and m > 0: mem = m
pe = cores
pe_mem = (mem * 1.0) / account_pe[account]
if pe_mem > 0 and pe_mem > pe:
pe = pe_mem
id = job['name'].split('.')[0]
#
# Check per-job requested walltime limit:
#
walltime = sum(int(x) * walltime_conv[i] for i,x in enumerate(reversed(r['walltime'].split(":"))))
if walltime > account_wclimit[account]:
msg = "%s job %s exceeds %s wallclock limit of %d hours: %s" % ( user, id, account, account_wclimit[account] / 3600, show_walltime( walltime ) )
warnings[user].append(msg)
#
# Check for inappropriate use of lsa_fluxm
# (check to see if job could run under lsa_flux):
#
if account == 'lsa_fluxm' and \
cores > 0 and cores <= 24 and mem > 0 and mem <= 96 * 1024 * 1024:
# This job can run under lsa_flux, if it has the right geometry:
if cores <= 20 and mem <= 80 * 1024 * 1024:
# The job can definitely fit on a single Standard Flux node:
msg = "%s job %s looks like it should be running under lsa_flux instead of lsa_fluxm" % ( user, id )
warnings[user].append(msg)
add_fluxm_tip[user] = True
elif 'procs' in r:
# The job can fit on more than 1 Standard Flux node:
msg = "%s job %s looks like it should be running under lsa_flux instead of lsa_fluxm" % ( user, id )
warnings[user].append(msg)
add_fluxm_tip[user] = True
elif maxppn > 0 and maxppn <= 20:
# The job can fit on more than 1 Standard Flux node:
msg = "%s job %s looks like it should be running under lsa_flux instead of lsa_fluxm" % ( user, id )
warnings[user].append(msg)
add_fluxm_tip[user] = True
#
# Check for inappropriate use of lsa_fluxg
# (not using a GPU, or using more than 1 GPUs):
#
if account == 'lsa_fluxg':
gpus = 0
if 'nodes' in r:
for nodespec in r['nodes'].split('+'):
m = re.search( r'gpus=(\d+)', nodespec )
if m:
gpus += int( m.group(1) )
if gpus == 0:
msg = "%s job %s is running under lsa_fluxg but has not requested any GPUs -- it should be running under lsa_flux instead" % ( user, id )
warnings[user].append(msg)
if gpus > 1:
msg = "%s job %s is using too many GPUs (%d of a maximum of 1)" % ( user, id, gpus )
warnings[user].append(msg)
#
# Check to see if the user has another allocation they could use
# instead:
#
if user in user_accounts:
better = []
for a in user_accounts[user]:
if account[-1] != a[-1]:
# Types of account do not match.
# Therefore do not consider the other accounts that
# the user has access to to be a better match unless
# the job is running under Standard Flux and the user
# user has a FluxM, FluxOD, or FluxOE account available.
if account[-1] != 'x':
continue # job is not running under Standard Flux
if a[-1] != 'm' and a[-2] != 'od' and a[-2] != 'oe':
continue # user has no Flux{M,OD,OE} account available
# FluxOD accounts don't have MAXPROC, so assume 1500
a_maxproc = account_info[a].get( 'maxproc', 0 )
if a_maxproc == 0 and a[-2] == 'od':
a_maxproc = 1500
# FluxOE accounts don't have a MAXMEM, so assume 4 GB/core
a_maxmem = account_info[a].get( 'maxmem', a_maxproc * 4096 * 1024 )
if cores > 0 and mem > 0 and cores <= a_maxproc and mem <= a_maxmem:
better.append(a)
if better:
msg = "%s job %s should be running under %s instead of %s" % ( user, id, " or ".join(better), account )
warnings[user].append(msg)
#
# ...end of checks...
#
w = walltime
if attrib['Walltime'] and attrib['Walltime']['Remaining']:
w = int( attrib['Walltime']['Remaining'] )
cm = 1.0 * cores * w
flux_usage[account][user] = cm + flux_usage[account].setdefault( user, 0 )
flux_runningjobcount_byuser[account][user] = 1 + flux_runningjobcount_byuser[account].setdefault( user, 0 )
flux_runningcorecount_byuser[account][user] = cores + flux_runningcorecount_byuser[account].setdefault( user, 0 )
flux_runningmemcount_byuser[account][user] = mem + flux_runningmemcount_byuser[account].setdefault( user, 0 )
flux_runningpecount_byuser[account][user] = pe + flux_runningpecount_byuser[account].setdefault( user, 0 )
flux_cores[account] += cores
flux_mem[account] += mem
flux_pe[account] += pe
flux_cm[account] += cm
flux_runningjobs[account].append( ( id, user, cores, mem, pe, walltime, cm ) );
num_warnings = len( warnings.keys() )
if num_warnings > 0:
print "\nWARNING: the following jobs violate the LSA public allocation usage policy:\n"
for user in warnings:
if args.mail: print "sending mail to user " + user + ":"
user_warnings = warnings[user]
for msg in user_warnings:
print " " + msg
if args.mail:
if user in add_fluxm_tip:
user_warnings.append("""
TIP: Always run a job under lsa_flux if it requires:
- 20 or fewer cores and 80 GB or less RAM on a single node, OR
- 24 or fewer cores or 96 GB or less RAM across multiple nodes""")
message="""\
From: %(from)s
Reply-To: [email protected]
To: %(to)s
Cc: %(cc)s
Subject: LSA Public allocation usage policy violation
Content-type: text/plain
Hello,
You are currently running the following jobs which appear to violate
the usage policy for the LSA public Flux allocations:
%(warnings)s
Please delete these jobs, address the problem, and resubmit the jobs.
For more information on the LSA public Flux allocations' usage
policies, together with information on how to stay within the usage
limits, please see
http://arc-ts.umich.edu/lsa-flux/
This policy is intended to help ensure the fair sharing of LSA's
public Flux allocations. People whose job submissions do not adhere
to the policy will receive email warnings such as this one. An
excessive number of notifications may result in your being removed
from the LSA allocations.
You can check your own usage by running the command
lsa_flux_check --details
If you have any questions about the policy or how to comply with it,
or if you would like assistance, please let us know and we will be
glad to assist you.
Sincerely,
LSA IT Advocacy and Research Support
""" % { 'from': current_user + "@umich.edu", 'to': user + "@umich.edu",
'cc': admin_email, 'warnings': "\n".join( user_warnings ) }
pipe = subprocess.Popen( [ '/usr/sbin/sendmail', '-t', '-f', current_user + "@umich.edu" ], stdin=subprocess.PIPE )
pipe.stdin.write( message )
pipe.stdin.close()
pipe.wait()
if args.details:
print ""
print "Account Users Jobs Cores Memory PE CM"
print "--------- ----- ------------ ----------- ------------------- ------ -------"
for account in accounts:
jobs = sum( flux_runningjobcount_byuser[account].values() )
print "%-9s %5d %4d / %5d %5d / %3d %9s / %7s %6.2f %7.4f" % ( account, len( flux_runningjobcount_byuser[account].keys() ), jobs, flux_totaljobcount[account], flux_cores[account], account_cores[account], show_memory( flux_mem[account] ), account_mem[account], flux_pe[account], flux_cm[account] / (30*24*60*60) )
if args.details and args.all:
print ""
print "User Account Jobs Cores Memory PE CM left"
print "-------- --------- ---- ----- --------- ------ -------"
for account in flux_usage:
for user in flux_usage[account]:
msg = "%-8s %-9s %4d %5s %9s %6.2f %7.4f" % ( user, account, flux_runningjobcount_byuser[account][user], flux_runningcorecount_byuser[account][user], show_memory( flux_runningmemcount_byuser[account][user] ), flux_runningpecount_byuser[account][user], flux_usage[account][user] / (30 * 24 * 60 * 60) )
print msg
if args.details:
print ""
print "Job ID Account User Cores Memory PE Walltime CM left"
print "-------------- --------- -------- ----- --------- ------ --------- -------"
for account in accounts:
for job in flux_runningjobs[account]:
( id, user, cores, mem, pe, walltime, cm ) = job
print "%-14s %-9s %-8s %5s %9s %6.2f %9s %7.4f" % ( id, account, user, cores if cores > 0 else '???', show_memory( mem ), pe, show_walltime( walltime ), cm / (30*24*60*60) )
def do_scheduled_report( args ):
with Capturing() as output:
main( args )
message="""\
From: %(from)s
To: %(to)s
Subject: lsa_flux usage
Content-type: text/plain
%(report)s
""" % { 'from': current_user + "@umich.edu", 'to': admin_email,
'report': str( output ) }
pipe = subprocess.Popen( [ '/usr/sbin/sendmail', '-t', '-f', current_user + "@umich.edu" ], stdin=subprocess.PIPE )
pipe.stdin.write( message )
pipe.stdin.close()
pipe.wait()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Checks to see if a user is complying with the usage limits for the LSA public Flux allocaitons")
parser.add_argument("--all", help="check the usage of all users", action="store_true")
parser.add_argument("--details", help="display extra information", action="store_true")
parser.add_argument("--mail", help="send mail to offending users (Flux support staff only)", action="store_true")
parser.add_argument("--daemonize", help="run as a daemon (Flux support staff only)", action="store_true")
parser.add_argument("--submit", help="run as a self-resubmitting job (Flux support staff only)", action="store_true")
args = parser.parse_args()
if args.mail:
if not flux_support_staff( current_user ):
print sys.argv[0] + ": --mail can only be used by Flux support staff"
exit( 1 )
if args.submit:
if not flux_support_staff( current_user ):
print sys.argv[0] + ": --submit can only be used by Flux support staff"
exit( 1 )
args.all = args.details = args.mail = True
args.daemonize = False
dir = os.getenv( 'PBS_O_WORKDIR', '.' )
os.chdir( dir )
hour = time.localtime().tm_hour
if sys.stdout.isatty():
print "Becoming self-submitting job. Will check twice daily (morning and afternoon)."
if hour in scheduled_hours: do_scheduled_report( args )
else:
do_scheduled_report( args )
next_hour = scheduled_hours[0]
for h in scheduled_hours:
if h > hour:
next_hour = h
break
pipe = subprocess.Popen( [ '/usr/local/torque/bin/qsub', '-N', 'lsa_flux_check', '-V', '-A', 'support_flux', '-q', 'flux', '-M', admin_email, '-m', 'a', '-l', 'procs=1,mem=2gb,walltime=15:00,qos=flux', '-S', '/bin/bash', '-j', 'oe', '-a', '%02d00' % next_hour ], stdin=subprocess.PIPE )
pipe.stdin.write( os.path.abspath(inspect.getsourcefile(main) + " --submit" ) )
pipe.stdin.close()
pipe.wait()
exit( 0 )
if args.daemonize:
if not flux_support_staff( current_user ):
print sys.argv[0] + ": --daemonize can only be used by Flux support staff"
exit( 1 )
args.all = args.details = args.mail = True
print "Becoming daemon. Will check four times daily."
with daemon.DaemonContext():
while True:
hour = time.localtime().tm_hour
if hour in scheduled_hours: do_scheduled_report( args )
time.sleep( 3600 )
else:
main( args )