Skip to content

Commit

Permalink
Flynt updates, some manual
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Nov 7, 2023
1 parent 86ffe1c commit 8e8718c
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 461 deletions.
270 changes: 92 additions & 178 deletions pilot/user/atlas/common.py

Large diffs are not rendered by default.

108 changes: 51 additions & 57 deletions pilot/user/atlas/container.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pilot/user/atlas/copytool_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get_path(scope, lfn):
:return: partial rucio path (string).
"""

s = '%s:%s' % (scope, lfn)
s = f'{scope}:{lfn}'
hash_hex = md5(s.encode('utf-8')).hexdigest()
paths = scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
paths = [_f for _f in paths if _f] # remove empty parts to avoid double /-chars
Expand Down
41 changes: 2 additions & 39 deletions pilot/user/atlas/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def get_core_count(job):
try:
job.corecount = int(os.environ.get('ATHENA_PROC_NUMBER'))
except (ValueError, TypeError) as exc:
logger.warning("ATHENA_PROC_NUMBER is not properly set: %s (will use existing job.corecount value)", exc)
logger.warning(f"ATHENA_PROC_NUMBER is not properly set: {exc} "
f"(will use existing job.corecount value)")
else:
try:
job.corecount = int(os.environ.get('ATHENA_PROC_NUMBER'))
Expand Down Expand Up @@ -84,21 +85,6 @@ def set_core_counts(**kwargs):
# something like this could be used if prmon also gave info about ncores
# (change nprocs -> ncores and add ncores to list in utilities module, get_average_summary_dictionary_prmon())

#summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor)
#if summary_dictionary:
# if 'nprocs' in summary_dictionary["Other"]:
# try:
# job.actualcorecount = int(summary_dictionary["Other"]["nprocs"])
# except Exception as exc:
# logger.warning('exception caught: %s', exc)
# else:
# job.corecounts = add_core_count(job.actualcorecount)
# logger.debug('current core counts list: %s', str(job.corecounts))
# else:
# logger.debug('summary_dictionary[Other]=%s', summary_dictionary["Other"])
#else:
# logger.debug('no summary_dictionary')

job = kwargs.get('job', None)
walltime = kwargs.get('walltime', None)

Expand Down Expand Up @@ -129,26 +115,3 @@ def set_core_counts(**kwargs):
logger.debug('no summary dictionary')
else:
logger.debug(f'failed to calculate number of cores (walltime={walltime})')

# if job and job.pgrp:
# # ps axo pgid,psr -> 154628 8 \n 154628 9 \n 1546280 1 ..
# # sort is redundant; uniq removes any duplicate lines; wc -l gives the final count
# # awk is added to get the pgrp list only and then grep -x makes sure that false positives are removed, e.g. 1546280
# cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp)
# _, stdout, _ = execute(cmd, mute=True)
# logger.debug('%s: %s', cmd, stdout)
# try:
# job.actualcorecount = int(stdout)
# except ValueError as exc:
# logger.warning('failed to convert number of actual cores to int: %s', exc)
# else:
# job.corecounts = add_core_count(job.actualcorecount) #, core_counts=job.corecounts)
# #logger.debug('current core counts list: %s', str(job.corecounts))
# # check suspicious values
# #if job.actualcorecount > 5:
# # logger.warning('detected large actualcorecount: %d', job.actualcorecount)
# # cmd = "ps axo pgid,stat,euid,ruid,tty,tpgid,sess,pgrp,ppid,pid,pcpu,comm | sort | uniq | grep %d" % job.pgrp
# # exit_code, stdout, stderr = execute(cmd, mute=True)
# # logger.debug('%s (pgrp=%d): %s', cmd, job.pgrp, stdout)
# else:
# logger.debug('payload process group not set - cannot check number of cores used by payload')
53 changes: 27 additions & 26 deletions pilot/user/atlas/dbrelease.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ def get_dbrelease_dir():
logger.warning("note: the DBRelease database directory is not available (will not attempt to skip DBRelease stage-in)")
else:
if os.path.exists(path):
logger.info("local DBRelease path verified: %s (will attempt to skip DBRelease stage-in)", path)
logger.info(f"local DBRelease path verified: {path} (will attempt to skip DBRelease stage-in)")
else:
logger.warning("note: local DBRelease path does not exist: %s (will not attempt to skip DBRelease stage-in)", path)
logger.warning(f"note: local DBRelease path does not exist: {path} "
f"(will not attempt to skip DBRelease stage-in)")

return path

Expand Down Expand Up @@ -107,14 +108,14 @@ def is_dbrelease_available(version):
# is the required DBRelease version available?
if dir_list:
if version in dir_list:
logger.info("found version %s in path %s (%d releases found)", version, path, len(dir_list))
logger.info(f"found version {version} in path {path} ({len(dir_list)} releases found)")
status = True
else:
logger.warning("did not find version %s in path %s (%d releases found)", version, path, len(dir_list))
logger.warning(f"did not find version {version} in path {path} ({len(dir_list)} releases found)")
else:
logger.warning("empty DBRelease directory list: %s", path)
logger.warning(f"empty DBRelease directory list: {path}")
else:
logger.warning('no such DBRelease path: %s', path)
logger.warning(f'no such DBRelease path: {path}')

return status

Expand All @@ -135,21 +136,21 @@ def create_setup_file(version, path):
if _dir != "" and version != "":
# create the python code string to be written to file
txt = "import os\n"
txt += "os.environ['DBRELEASE'] = '%s'\n" % version
txt += "os.environ['DATAPATH'] = '%s/%s:' + os.environ['DATAPATH']\n" % (_dir, version)
txt += "os.environ['DBRELEASE_REQUIRED'] = '%s'\n" % version
txt += "os.environ['DBRELEASE_REQUESTED'] = '%s'\n" % version
txt += "os.environ['CORAL_DBLOOKUP_PATH'] = '%s/%s/XMLConfig'\n" % (_dir, version)
txt += f"os.environ['DBRELEASE'] = '{version}'\n"
txt += f"os.environ['DATAPATH'] = '{_dir}/{version}:' + os.environ['DATAPATH']\n"
txt += f"os.environ['DBRELEASE_REQUIRED'] = '{version}'\n"
txt += f"os.environ['DBRELEASE_REQUESTED'] = '{version}'\n"
txt += f"os.environ['CORAL_DBLOOKUP_PATH'] = '{_dir}/{version}/XMLConfig'\n"

try:
status = write_file(path, txt)
except FileHandlingFailure as exc:
logger.warning('failed to create DBRelease setup file: %s', exc)
logger.warning(f'failed to create DBRelease setup file: {exc}')
else:
logger.info("Created setup file with the following content:.................................\n%s", txt)
logger.info(f"Created setup file with the following content:.................................\n{txt}")
logger.info("...............................................................................")
else:
logger.warning('failed to create %s for DBRelease version=%s and directory=%s', path, version, _dir)
logger.warning(f'failed to create {path} for DBRelease version={version} and directory {_dir}')

return status

Expand All @@ -171,55 +172,55 @@ def create_dbrelease(version, path):
try:
mkdirs(_path, chmod=None)
except PilotException as exc:
logger.warning('failed to create directories for DBRelease: %s', exc)
logger.warning(f'failed to create directories for DBRelease: {exc}')
else:
logger.debug('created directories: %s', _path)
logger.debug(f'created directories: {_path}')

# create the setup file in the DBRelease directory
version_path = os.path.join(dbrelease_path, version)
setup_filename = "setup.py"
_path = os.path.join(version_path, setup_filename)
if create_setup_file(version, _path):
logger.info("created DBRelease setup file: %s", _path)
logger.info(f"created DBRelease setup file: {_path}")

# now create a new DBRelease tarball
filename = os.path.join(path, "DBRelease-%s.tar.gz" % version)
logger.info("creating file: %s", filename)
filename = os.path.join(path, f"DBRelease-{version}.tar.gz")
logger.info(f"creating file: {filename}")
try:
tar = tarfile.open(filename, "w:gz")
except (IOError, OSError) as exc:
logger.warning("could not create DBRelease tar file: %s", exc)
logger.warning(f"could not create DBRelease tar file: {exc}")
else:
if tar:
# add the setup file to the tar file
tar.add("%s/DBRelease/%s/%s" % (path, version, setup_filename))
tar.add(f"{path}/DBRelease/{version}/{setup_filename}")

# create the symbolic link DBRelease/current -> 12.2.1
try:
_link = os.path.join(path, "DBRelease/current")
os.symlink(version, _link)
except OSError as exc:
logger.warning("failed to create symbolic link %s: %s", _link, exc)
logger.warning(f"failed to create symbolic link {_link}: {exc}")
else:
logger.warning("created symbolic link: %s", _link)
logger.warning(f"created symbolic link: {_link}")

# add the symbolic link to the tar file
tar.add(_link)

# done with the tar archive
tar.close()

logger.info("created new DBRelease tar file: %s", filename)
logger.info(f"created new DBRelease tar file: {filename}")
status = True
else:
logger.warning("failed to open DBRelease tar file")

# clean up
if rmdirs(dbrelease_path):
logger.debug("cleaned up directories in path: %s", dbrelease_path)
logger.debug(f"cleaned up directories in path: {dbrelease_path}")
else:
logger.warning("failed to create DBRelease setup file")
if rmdirs(dbrelease_path):
logger.debug("cleaned up directories in path: %s", dbrelease_path)
logger.debug(f"cleaned up directories in path: {dbrelease_path}")

return status
Loading

0 comments on commit 8e8718c

Please sign in to comment.