Skip to content

Commit

Permalink
Merge pull request #102 from jmills-ncar/master
Browse files Browse the repository at this point in the history
A number of package upgrades to better handle PBS
  • Loading branch information
T. Joe Mills authored Aug 1, 2018
2 parents a4860de + c3a444f commit 81e377a
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 88 deletions.
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

setup(
name='wrfhydropy',
version='0.0.5dev0',
version='0.0.5',
packages=find_packages(),
package_data={'wrfhydropy': ['core/data/*']},
url='https://github.com/NCAR/wrf_hydro_py',
license='MIT',
install_requires=['pandas',
'f90nml',
'netcdf4',
'deepdiff',
'pathlib',
'xarray',
'datetime',
'pytest',
'pytest-datadir-ng'],
'pytest-datadir-ng',
'boltons'],
author='Joe Mills',
author_email='[email protected]',
description='Crude API for the WRF-Hydro model',
Expand Down
3 changes: 2 additions & 1 deletion wrfhydropy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
from .core.simulation import *
from .core import schedulers
from .core import outputdiffs
from .core import ioutils
from .core import ioutils
from .core import namelist
1 change: 1 addition & 0 deletions wrfhydropy/core/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import warnings
import pathlib
import netCDF4

from boltons import iterutils

Expand Down
84 changes: 42 additions & 42 deletions wrfhydropy/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,6 @@ def __init__(
self._hydro_namelist = None
self._hrldas_namelist = None

# # Attributes set by class methods
# self.hrldas_times = {'noahlsm_offline':
# {'kday':None,
# 'khour':None,
# 'start_year':None,
# 'start_month':None,
# 'start_day': None,
# 'start_hour':None,
# 'start_min':None,
# 'restart_filename_requested': None}
# }
# """dict: the HRLDAS namelist used for this job."""
#
# self.hydro_times = {'hydro_nlist':
# {'restart_file':None},
# 'nudging_nlist':
# {'nudginglastobsfile':None}
# }
# """dict: the hydro namelist used for this job."""

self.exit_status = None
"""int: The exit status of the model job parsed from WRF-Hydro diag files"""

Expand Down Expand Up @@ -142,12 +122,20 @@ def clone(self, N) -> list:
clones.append(copy.deepcopy(self))
return(clones)

def pickle(self,path: str):
"""Pickle sim object to specified file path
Args:
path: The file path for pickle
"""
path = pathlib.Path(path)
with path.open(mode='wb') as f:
pickle.dump(self, f, 2)

def _run(self):
"""Private method to run a job"""

# Create curent dir path to use for all operations. Needed so that everything can be run
# relative to the simulation directory

current_dir = pathlib.Path(os.curdir)

# Print some basic info about the run
Expand Down Expand Up @@ -202,21 +190,28 @@ def _run(self):
self.job_end_time = str(datetime.datetime.now())

# String match diag files for successfull run
self.exit_status = 1
with current_dir.joinpath('diag_hydro.00000').open() as f:
diag_file = f.read()
if 'The model finished successfully.......' in diag_file:
self.exit_status = 0

# cleanup job-specific run files
diag_files = current_dir.glob('*diag*')
for file in diag_files:
shutil.move(str(file), str(self.job_dir))

shutil.move(str(self.stdout_file),str(self.job_dir))
shutil.move(str(self.stderr_file),str(self.job_dir))
current_dir.joinpath('hydro.namelist').unlink()
current_dir.joinpath('namelist.hrldas').unlink()
diag_file = current_dir.joinpath('diag_hydro.00000')
if diag_file.exists():
with diag_file.open() as f:
diag_file = f.read()
if 'The model finished successfully.......' in diag_file:
self.exit_status = 0

# cleanup job-specific run files
diag_files = current_dir.glob('*diag*')
for file in diag_files:
shutil.move(str(file), str(self.job_dir))

shutil.move(str(self.stdout_file),str(self.job_dir))
shutil.move(str(self.stderr_file),str(self.job_dir))
current_dir.joinpath('hydro.namelist').unlink()
current_dir.joinpath('namelist.hrldas').unlink()
else:
self.exit_status = 1
self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl')))
raise RuntimeError('Model did not finish successfully')

self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl')))

def _write_namelists(self):
"""Private method to write namelist dicts to FORTRAN namelist files"""
Expand Down Expand Up @@ -289,7 +284,7 @@ def _write_run_script(self):
"""Private method to write a python script to run the job. This is used primarily for
compatibility with job schedulers on HPC systems"""

self._pickle()
self.pickle(str(self.job_dir.joinpath('WrfHydroJob_prerun.pkl')))

pystr = ""
pystr += "# import modules\n"
Expand All @@ -310,11 +305,10 @@ def _write_run_script(self):
pystr += "\n"

pystr += "#load job object\n"
pystr += "job_dir = 'job_' + args.job_id + '/WrfHydroJob.pkl'\n"
pystr += "job = pickle.load(open(job_dir,mode='rb'))\n"
pystr += "job_file = 'job_' + args.job_id + '/WrfHydroJob_prerun.pkl'\n"
pystr += "job = pickle.load(open(job_file,mode='rb'))\n"
pystr += "#Run the job\n"
pystr += "job._run()\n"
pystr += "job._pickle()\n"

pystr_file = 'run_job.py'
with open(pystr_file,mode='w') as f:
Expand All @@ -341,10 +335,16 @@ def _solve_model_start_end_times(self):

return model_start_time, model_end_time

def _pickle(self):
with self.job_dir.joinpath('WrfHydroJob.pkl').open(mode='wb') as f:
def pickle(self,path: str):
"""Pickle job object to specified file path
Args:
path: The file path for pickle
"""
path = pathlib.Path(path)
with path.open(mode='wb') as f:
pickle.dump(self, f, 2)


@property
def job_dir(self):
"""Path: Path to the run directory"""
Expand Down
66 changes: 33 additions & 33 deletions wrfhydropy/core/schedulers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Note: All other imports for individual schedulers should be done in the respective scheduler
# class functions so that imports can be isolated to relevant schedulers
import copy

from abc import ABC, abstractmethod
from .job import Job
Expand All @@ -10,21 +9,18 @@ def __init__(self):
super().__init__()

@abstractmethod
def add_job(self, job: Job):
pass

@abstractmethod
def schedule(self):
def schedule(self,jobs):
pass

class PBSCheyenne(Scheduler):

"""A Scheduler object compatible with PBS on the NCAR Cheyenne system."""
def __init__(
self,
account: str,
email_who: str = None,
email_when: str = 'abe',
nproc: int = 360,
nproc: int = 72,
nnodes: int = None,
ppn: int = 36,
queue: str = 'regular',
Expand All @@ -49,13 +45,6 @@ def __init__(
self._nnodes = nnodes
self._ppn = ppn

# Attribute
self.jobs = []
self.scheduled_jobs = []

# Setup exe cmd, will overwrite job exe cmd
self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe'

## Scheduler options dict
## TODO: Make this more elegant than hard coding for maintenance sake
self.scheduler_opts = {'account':account,
Expand All @@ -64,17 +53,17 @@ def __init__(
'queue':queue,
'walltime':walltime}

def add_job(self,job: Job):
"""Add a job to the scheduler"""
job = copy.deepcopy(job)

#Override job exe cmd with scheduler exe cmd
job._exe_cmd = self._exe_cmd

self.jobs.append(job)

def schedule(self):
"""Schedule jobs that have been added to the scheduler"""
# Setup exe cmd, will overwrite job exe cmd
if self.scheduler_opts['queue'] == 'shared':
self._exe_cmd = 'mpirun -np {0} ./wrf_hydro.exe'.format(self.nproc)
else:
self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe'

def schedule(self,jobs: list):
"""Schedule one or more jobs using the scheduler scheduler
Args:
jobs: list of jobs to schedule
"""
import subprocess
import shlex
import pathlib
Expand All @@ -86,20 +75,20 @@ def schedule(self):
# they can't change the order, may not be an issue except for if scheduling fails
# somewhere

self._write_job_pbs()
self._write_job_pbs(jobs=jobs)

# Make lists to store pbs scripts and pbs job ids to get previous dependency
pbs_jids = []
pbs_scripts = []

qsub_str = '/bin/bash -c "'
for job_num, option in enumerate(self.jobs):
for job_num, option in enumerate(jobs):

# This gets the pbs script name and pbs jid for submission
# the obs jid is stored in a list so that the previous jid can be retrieved for
# dependency
job_id = self.jobs[job_num].job_id
pbs_scripts.append(str(self.jobs[job_num].job_dir) + '/job_' + job_id + '.pbs')
job_id = jobs[job_num].job_id
pbs_scripts.append(str(jobs[job_num].job_dir) + '/job_' + job_id + '.pbs')
pbs_jids.append('job_' + job_id)

# If first job, schedule using hold
Expand All @@ -121,9 +110,17 @@ def schedule(self):
subprocess.run(shlex.split('qrls $' + pbs_jids[0]),
cwd=str(current_dir))

def _write_job_pbs(self):
def _write_job_pbs(self,jobs):
"""Private method to write bash PBS scripts for submitting each job """
for job in self.jobs:
import copy
import sys

# Get the current pytohn executable to handle virtual environments in the scheduler
python_path = sys.executable

for job in jobs:
# Copy the job because the exe cmd is edited below
job = copy.deepcopy(job)

# Write PBS script
jobstr = ""
Expand All @@ -148,7 +145,7 @@ def _write_job_pbs(self):
jobstr += "\n"

jobstr += "# Not using PBS standard error and out files to capture model output\n"
jobstr += "# but these hidden files might catch output and errors from the scheduler.\n"
jobstr += "# but these files might catch output and errors from the scheduler.\n"
jobstr += "#PBS -o {0}\n".format(job.job_dir)
jobstr += "#PBS -e {0}\n".format(job.job_dir)
jobstr += "\n"
Expand All @@ -168,13 +165,16 @@ def _write_job_pbs(self):
if self.scheduler_opts['queue'] == 'share':
jobstr += "export MPI_USE_ARRAY=false\n"

jobstr += 'python run_job.py --job_id {0}\n'.format(job.job_id)
jobstr += '{0} run_job.py --job_id {1}\n'.format(python_path, job.job_id)
jobstr += 'exit $?\n'

pbs_file = job.job_dir.joinpath('job_' + job.job_id + '.pbs')
with pbs_file.open(mode='w') as f:
f.write(jobstr)

# Write the python run script for the job
## Overwrite job exe cmd with scheduler exe cmd
job._exe_cmd = self._exe_cmd
job._write_run_script()

def _solve_nodes_cores(self):
Expand Down
28 changes: 19 additions & 9 deletions wrfhydropy/core/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ def compose(self, symlink_domain: bool = True, force: bool = False):
print('Validating job input files')
self._validate_jobs()

# Add jobs to scheduler
if self.scheduler is not None:
print('Adding jobs to scheduler...')
for job in self.jobs:
self.scheduler.add_job(job)

# Compile model or copy files
if self.model.compile_log is not None:
if self.model.compile_log.returncode == 0:
Expand All @@ -121,16 +115,32 @@ def run(self):
for job in self.jobs:
job._run()
else:
self.scheduler.schedule()
self.scheduler.schedule(jobs=self.jobs)

# Overwrite the object after run if successfull
path = current_dir.joinpath('WrfHydroSim.pkl')
self.pickle(path)

def collect(self):
"""Collect simulation output after a run"""

current_dir = pathlib.Path(os.curdir).absolute()

# Overwrite sim job objects with collected objects matched on job id
## Create dict of index/ids so that globbed jobs match the original list order
id_index = dict()
for index, item in enumerate(self.jobs):
id_index[item.job_id] = index

## Insert collect jobs into sim job list
job_objs = current_dir.rglob('WrfHydroJob_postrun.pkl')
for job_obj in job_objs:
collect_job = pickle.load(job_obj.open(mode='rb'))
original_idx = id_index[collect_job.job_id]
self.jobs[original_idx] = collect_job

self.output = SimulationOutput()
self.output.collect(sim_dir=os.getcwd())
self.output.collect_output(sim_dir=os.getcwd())

def pickle(self,path: str):
"""Pickle sim object to specified file path
Expand Down Expand Up @@ -258,7 +268,7 @@ def __init__(self):
self.restart_nudging = None
"""list: List of nudgingLastObs WrfHydroStatic objects"""

def collect(self,sim_dir: Union[str,pathlib.Path]):
def collect_output(self,sim_dir: Union[str,pathlib.Path]):
"""Collect simulation output after a run
Args:
sim_dir: The simulation directory
Expand Down
Loading

0 comments on commit 81e377a

Please sign in to comment.