-
Notifications
You must be signed in to change notification settings - Fork 7
Magic
qsub
is used to submit jobs. Generally the command will look like:
[user@magic]$qsub <PBS Script>
where <PBS Script>
requests the number of desired nodes and cores, sets walltime, and launches the job with mpirun
.
* ##### qstat
qstat
displays basic information about the currently running (R), queued (Q), and recently completed (C) jobs. The most basic usage of qstat is:
[user@magic]$ qstat
This displays all current and recently completed jobs. If a number of people are using the cluster, this is likely too much information. Therefore, we can look at just a single user using:
[user@magic]$ qstat -au <username>
The output then looks like:
[[/images/qstat.png]]
The columns of this output show:
* Job ID - When communicating or querying a specific job it is possible to use either the numeric job identifier (3574202) or the fully qualified name (3574204.systemimager.magic).
* Username - The user who initiated the job.
* Queue - The queue type the job is waiting or processing in.
* Jobname - The user defined jobname.
* SessID - A system generated identifier.
* NDS - The number of nodes requested.
* Req'd TSK - The total number of cores requested as computed by qselect
is a convenient way to get a listing of the JOB_IDs based on some criteria. For example, if I want all the jobs for a specific user I can run:
[user@magic]$ qselect -u jlaura
This provides a listing of fully qualified Job Ids. It is also possible to get jobs of only a specific status using:
[user@magic]$ qselect -u jlaura -s <Status Code>
For example, if I want to know the Job Ids for all currently running jobs for my user I would use:
[user@magic]$ qselect -u jlaura -s R
The ability to get a listing of job status without additional processing information is useful for both BASH scripting of some functionality (see qdel below) and parsing within a python script.
* ##### qdel
qdel
is used to delete queued on held jobs. The most basic usage, to delete a sinlge job is:
[user@magic]$ qdel <Job ID>
This removes the job from the queue and sets the job status to completed (C).
When running a larger job it may be possible that a log file is being incorretly written, or the first jobs to be run take significantly longer than expected. In this case it is convenient to be able to delete all the queued jobs using a single command. qselect
and xargs
can be used in conjunction with qdel to do this:
[user@magic]$ qselect -u jlaura -s Q | xargs qdel
Here, qselect
selects the jobs, -u
flags that a single user's jobs are to be listed, -s
flags that only a specific status of job is to be listed (queued (Q) in this case). The output of this select statement is then piped to qdel
using xargs
.
For full documentation see the cluster resources manual page for qdel.
* #####qrun
qrun
can be used to force a queued job to run. On Magic I have been seeing jobs sit queued when all resources are available. It is unclear whether this is due to how the Python launching script is working, the way that mpi4py and the scheduler are interacting to lock and free resources, or some other issue. If resources are available and a job is sitting queued for a long period of time, it can be manually run using:
[user@magic]$ qrun <Job ID>
For full documentation see the cluster resources manual page for qrun.
* #####qsig
qsig
can be used to send linux style signals to processes. This is useful if a job has hung and it should be killed. qsig
usage is:
[user@magic]$ qsig -s <SIGNAL CODE> <Job Id>
For example, if Job ID 3574204.systemimager.magic is not responding I can send a KILL (SIGKILL / 9) to the processes using:
[user@magic]$ qsig -s KILL 3574204.systemimager.magic
This should terminate the job.
Alternatively, if the job is hung and you might normally kill it with ctrl-c
you can send a SIGINT to interrupt. For example:
[user@magic]$ qsig -s SIGINT 3574204.systemimager.magic
For full documentation see the cluster resources manual page for qsig.
* ##### qalter
qalter
allows the user to alter queued jobs. This is useful when the requested walltime is less than the total expected walltime. For example:
[user@magic]$ qalter <Job ID> -l walltime=00:00:00
For full documentation see the cluster resources manual page for qalter.
Message Passing Interface is the method used to communicate and transfer data between processes and nodes. Communication is handled in the background, and the methods provided by MPI wrap these communication protocols.
* ##### mpirun vs. mpiexec
It appears that mpirun
and mpiexec
are symlinked to the same executable. This is supported by this discussion post on the Open MPI mailing list. Therefor, it should be safe to use either mpirun
or mpiexec
in the PBS script used to launch the job.
* #####mpi4py
mpi4py is a python module that wraps many of the MPI (written in C) methods. When writing a python script, mpi4py provides all the tools necessary to manage communication and data passing.
* Installation
We assume that you are using the newest Anaconda Python release. Installation should have asked whether you wished to append your .profile
or .bash_rc
. We assume that you have and that running python
from the command prompt launches an interactive Anaconda Python session.
To install mpi4py for your user on the cluster simply run pip install mpi4py
.
* #####Data Passing
mpi4py broadly provides two methods of data passing - efficient NumPy arrays and generic Python objects.
* NumPy Arrays
Passing data an NumPy arrays is the most method of data sharing. NumPy arrays can be typed by MPI and therefore do not need to be pickled. For example:
```python
# Generate a random, normally distributed (0,1] vector with length 10
data_to_share = np.random.random(10)
#Broadcast the array to all processes from the parent (root=0) as type Double
data = comm.bcast([data_to_share, MPI.DOUBLE], root=0)
```
* Generic Python Objects
Caveats:
1. Data passed as generic python objects will generally be less efficient than passing a typed NumPy array.
2. Python objects must be pickable. This includes all native datatypes: booleans
, integers
, floats
, strings
, and None
. Additionally, lists
, tuples
, and dictionaries
of this objects are pickable. Generally, class instances (objects) are pickable, but I have not been able to pass a class instance as a python type (i.e. passing self
to a class or passing foo
where foo = myClass()
).
* #####Comminication Paradigms
The University of Texas at Austin Advanced Computing Center presentation (© UT Austin) on mpi4py provides a wonderful graphical representation of the message passing methods available via mpi4py:
Available communication methods are:
1. Point-to-Point
* comm.Send()
* comm.Recv()
2. Collective
* comm.bcast(data, root)
Referencing the above image, we see that bcast
broadcasts the data from the parent node to all child nodes.
* comm.Scatter(source_data, destination_data, root)
Python Objects
* comm.Scatter([source_data, type], [destination_data, type], root)
NumPy Arrays
By way of example, assume a a vector of length 100, one parent core (A), 4 available child processing cores (B-E). After computing the best possible decomposition (segmentation) of the vector it is possible to scatter
the data to the children such that B processes vector indices [0-24], C processes vector indices [25-49], D processes vector indices [50-74], and E processes vector indices [75-99]. The root or parent process waits and gathers
the data once processing is complete.
* comm.Gather(source_data, destination_data, root)
Python Objects
* comm.Gather([source_data, type], [destination_data, type], root)
NumPy Arrays
Using the above scatter
example, once processing is complete, the root or parent process can gather
the results to a local array (or pickable python data structure) for further processing, e.g. to write to a log file or generate a figure.
* comm.Allgather([data1, type], [data2, type], [data_n, type])
Gathers the results to all cores. For example, if core A scatters data to cores B-E, B-E process that data, and then Allgather()
all cores will have the results of the data.
* #####Sample Scatter Gather Using NumPy Arrays
To run the code below, either clone the pPysal repository and navigate to the geoda_cluster directory (the script is named scatter_gather.py
) or copy the code to a new python file (using vim).
To run use: mpirun -np 4 python scatter_gather.py
from mpi4py import MPI
import time
import numpy as np
#Create the communicator object
comm = MPI.COMM_WORLD
#Get the IDs of the cores
rank = comm.Get_rank()
#Get the number of cores
ncores = comm.Get_size()
local_size = 100
global_size = local_size * ncores
#Create the sample data in the parent process
if rank == 0:
data = np.arange(global_size, dtype=np.float64)
else:
data = None
#A local place to store the array
localdata = np.empty((local_size))
#Scatter(from, to)
comm.Scatter([data, MPI.DOUBLE],[localdata, MPI.DOUBLE],root=0)
#Add the child rank to all the values
#This could be any array manipulation, a function, return a masked array, etc.
localdata += ((rank + 1) * 3)
#Gather(from, to)
comm.Gather([localdata, MPI.DOUBLE], [data, MPI.DOUBLE], root=0)
#Since this used gather() and not Allgather, only rank 0 has all the data.
if rank == 0:
print data
PBS scripts are used to schedule jobs on the cluster. In the previous section a job was run directly using mpirun. This job did not take advantage of the cluster's scheduling. Therefore, always schedule jobs using PBS. The script can be run as a BASH script or wrapped in Python.
* ##### Basic Script
Below is a simple PBS script designed to run the scatter_gather.py
script above. Note that in line 10, needs to be replaced with your username. Additionally, this script assumes that you have cloned the pPysal repo to your home directory on the cluster. (git clone https://github.com/pysal/pPysal.git
)
Also note that the script must be executable. This is true for all scripts (python or bash) that you might want to use.
To make the script executable use: chmod +x script_name
.
Then submit the job to the scheduler: qsub pbs_scattergather.sh
.
#!/bin/bash
#PBS -S /bin/bash
#PBS -l nodes=2:ppn=8
#PBS -N ScatterDemo
#PBS -o $JOB_ID.out
#PBS -e $JOB_ID.err
#PBS -A Jay_Testing
#PBS -l walltime=01:00:00
use openmpi-1.6.4
cd /home/<username>/pPysal/geoda_cluster/
mpirun -np 1 python scatter_gather.py 1>output 2>error
Warning: This is not currently working properly when submitted via PBS.
* ##### Arguments
* ##### PBS via Python
* ##### Basic Wrapper
5. ####Fisher-Jenks Example
* Fisher-Jenks Python / PBS Script
* Fisher-Jenks MPI Script
* Analytics (Pandas)
6. ####Python on the Cluster
* Parallel Reads and Writes
* Logging
* Timing
7. ####MPE
* Better profiling
* Example MPE Script
8. ####Q & A
* #####Answered Questions
* What is the -A
flag in the PBS script? - An accounting flag for system admins to track usage.
* Does the PBS script need to end in .sh
? - No, it can have a file type of .sh
or nothing.
* Are a few large or many smaller jobs better? - The later. This is why wrapping PBS in python makes sense, it is possible to launch many more small jobs with a python script running in a screen
session.
* What is the best way to profile code? - Not sure yet, but MPE looks promising once it is running.
* How much memory is available per node? - 16GB
* Is a processes' rank unique and sequential across nodes? - Probably not. This will require testing.
* Can I see what an individual node is doing? - Yes, simply ssh into the node. The final two address numerals are identical to the Magic web usage tracker node number.
* An extension to the previous question, commenting out the log files, it appears that the gather never occurs from the child processes. What log file(s) might be of use in debugging it? - These reside in /var/spool/torque/ and then a series of subfolders for the pbs_mom and job specific directories (named for the job_id). Also the scheduling node (xx.xx.23.2) has additional PBS server logs.
* #####New Questions
* Launching many small jobs seems to lockup the scheduler, leaving some running indefinitely and other queued for an extended period of time. What is going on?
* The scattergather_submit
script will not launch via PBS. It appears that the logfiles are not able to access the directory. Any ideas?
* From my examples I can report that a PBS script with mpirun -np X, where X is any value other than 1 runs multiple jobs. That is, the number of requested nodes * number of requested cores * X. This does not appear to be inline with what I should expect from the documentation. What is the expected behavior?
- Basic Commands
- Message Passing Interface
- PBS
- Basic Script
- Arguments
- PBS via Python
- Basic Wrapper
- Fisher-Jenks Example
- Fisher-Jenks Python / PBS Script
- Fisher-Jenks MPI Script
- Analytics (Pandas)
- Python on the Cluster
- Parallel Reads and Writes
- Logging
- Timing
- MPE
- Better profiling
- Example MPE Script
- Q&A