-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
An example of job script from UIUC "Hydro" cluster, and example of python on-demand launcher that listens to the rabbitmq process queue and launches a job if there is anything to process.
- Loading branch information
1 parent
fd35028
commit 8681b7c
Showing
2 changed files
with
282 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
import pika,sys,getopt | ||
import os | ||
import json | ||
import subprocess | ||
import time | ||
|
||
|
||
#input_queue = "download" | ||
#input_queue = "process_flat_iceberg" | ||
input_queue = "process_golden_muscat" | ||
#input_queue = "upload" | ||
#output_queue = "send_to_processing" | ||
#input_queue = "test_upload" | ||
primary_batch_script="/projects/bbym/shared/CDR_processing/pipeline_processing_003/golden_muscat_launcher.bash" | ||
#secondary_batch_scripts=["/projects/bbym/shared/CDR_processing/pipeline_processing_003/drab_volcano_launcher.bash"] | ||
|
||
global input_dir | ||
input_dir="" | ||
global output_dir | ||
output_dir="" | ||
|
||
############### | ||
# this is only here because the include isn't working | ||
|
||
RMQ_username = "criticalmaas" | ||
RMQ_password = "keeNgoo1VahthuS4ii1r" | ||
|
||
# | ||
############## | ||
|
||
def how_many_messages_in_queue(secrets_file,queue_name): | ||
rabbitmq_uri=set_up_RMQ(secrets_file) | ||
|
||
parameters = pika.URLParameters(rabbitmq_uri) | ||
connection = pika.BlockingConnection(parameters) | ||
channel = connection.channel() | ||
my_queue=channel.queue_declare(queue=queue_name,durable=True) | ||
return_val=my_queue.method.message_count | ||
connection.close() | ||
return my_queue.method.message_count | ||
|
||
|
||
|
||
|
||
""" | ||
def launch_job_return_ID_special(batch_file): | ||
global secondary_batch_scripts | ||
# run for launching when it's the *first* job launched in a while | ||
# This function also fires off a couple of secondary jobs that are | ||
# expected to run for a short time but then will terminate, leaving | ||
# the main ones running for this program to keep track of. | ||
for aux_batch_file in secondary_batch_scripts: | ||
print("running secondary script") | ||
subprocess.run(['sbatch', aux_batch_file], stdout=subprocess.PIPE) | ||
return launch_job_return_ID(batch_file) | ||
""" | ||
def launch_job_return_ID(batch_file): | ||
sbatch_result=subprocess.run(['sbatch', batch_file], stdout=subprocess.PIPE) | ||
sbatch_string=sbatch_result.stdout.decode('utf-8') | ||
sbatch_output_list=sbatch_string.split() | ||
# (hopefully) returns the jobid of the new job | ||
return sbatch_output_list[3] | ||
|
||
def how_many_of_my_jobs_running(job_list): | ||
job_total=0 | ||
for check_job in job_list: | ||
squeue_result=subprocess.run(['squeue'], stdout=subprocess.PIPE) | ||
squeue_string=squeue_result.stdout.decode('utf-8') | ||
sq_line_list=squeue_string.splitlines() | ||
is_job_running=0 | ||
for myline in sq_line_list: | ||
segment_list = myline.split() | ||
# print(f"first segment:{segment_list[0]}") | ||
current_job = segment_list[0] | ||
if check_job == current_job: | ||
job_total += 1 | ||
is_job_running=1 | ||
#print(f" ****match!: {check_job} {current_job}") | ||
#else: | ||
#print(f"no match: {check_job} {current_job}") | ||
# if the job isn't in the job list, then it's not running, so we can | ||
# remove it from the job list | ||
if is_job_running == 0: | ||
job_list.remove(check_job) | ||
print(f"removing job {check_job} from the job list") | ||
return job_total | ||
|
||
def set_up_RMQ(secrets_file): | ||
global RMQ_username | ||
global RMQ_password | ||
|
||
if os.path.exists(secrets_file): | ||
execfile(filename) | ||
|
||
# rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" | ||
rabbitmq_uri = "amqp://ncsa:[email protected]:5672/%2F" | ||
return rabbitmq_uri | ||
|
||
""" | ||
def callback(ch, method, properties, body): | ||
print("***Received:") | ||
my_catalog=json.loads(body) | ||
print("got catalog") | ||
print(my_catalog) | ||
# print("map name:>>"+my_catalog['map_name']+"<<") | ||
""" | ||
|
||
""" | ||
file_URL=my_catalog['cog_url'] | ||
DL_command="cd "+input_dir+" ; wget "+file_URL | ||
print("download command: "+DL_command) | ||
""" | ||
|
||
""" | ||
# print("about to run download command") | ||
# os.system(DL_command) | ||
# print("finished download command") | ||
# result_file=input_dir+my_catalog['cog_id']+".cog.tif" | ||
print("input dir:"+input_dir) | ||
# print("resulting file: "+result_file) | ||
ch.basic_ack(delivery_tag=method.delivery_tag) | ||
print('about to sleep for 1 seconds') | ||
time.sleep(.2) | ||
print('finished ack') | ||
# sys.exit(2) | ||
""" | ||
|
||
def main(argv): | ||
# secrets file per-user | ||
global input_queue | ||
global primary_batch_script | ||
|
||
input_queue_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) | ||
print(f"Queue has {input_queue_count} items.") | ||
|
||
job_list=[] | ||
# job_ID=launch_job_return_ID(primary_batch_script) | ||
#print(f"launched job {job_ID}") | ||
#job_list.append(job_ID) | ||
|
||
#print("about to sleep") | ||
#time.sleep(2) | ||
|
||
#for i in range(180): | ||
# my_job_count=how_many_of_my_jobs_running(job_list) | ||
# print(f"There are {my_job_count} of our jobs running ({i}).") | ||
# time.sleep(1) | ||
|
||
while True : | ||
target_job_count = 0 | ||
process_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) | ||
if process_count > 0: | ||
target_job_count += 1 | ||
if process_count > 10: | ||
target_job_count += 1 | ||
if process_count > 20: | ||
target_job_count += 1 | ||
if process_count > 30: | ||
target_job_count += 2 | ||
currently_running=how_many_of_my_jobs_running(job_list) | ||
print(f"Status (prelaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") | ||
jobs_launched=0 | ||
while currently_running < target_job_count : | ||
job_ID=launch_job_return_ID(primary_batch_script) | ||
print(f"Launching job {job_ID}") | ||
job_list.append(job_ID) | ||
currently_running=how_many_of_my_jobs_running(job_list) | ||
jobs_launched += 1 | ||
print(f"Status (postlaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") | ||
time.sleep(300) | ||
# time.sleep(15) | ||
|
||
|
||
print("exiting for debugging.") | ||
sys.exit(2) | ||
|
||
|
||
|
||
|
||
""" | ||
rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") | ||
parameters = pika.URLParameters(rabbitmq_uri) | ||
connection = pika.BlockingConnection(parameters) | ||
channel = connection.channel() | ||
channel.queue_declare(queue=input_queue, durable=True) | ||
channel.basic_qos(prefetch_count=1) | ||
channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=False) | ||
""" | ||
print("Input data directory:"+input_dir) | ||
print("Output data directory:"+output_dir) | ||
print("Now beginning consuming loop:") | ||
|
||
""" | ||
channel.start_consuming() | ||
""" | ||
|
||
if __name__ == '__main__': | ||
main(sys.argv[1:]) | ||
# main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
#!/bin/bash | ||
|
||
#SBATCH -p a100 | ||
#SBATCH -A bbym-hydro | ||
#SBATCH --time 08:00:00 | ||
#SBATCH --mail-type=BEGIN,END | ||
#SBATCH [email protected] | ||
##SBATCH --cpus_per_task=56 | ||
#SBATCH -o CM_golden_muscat-%j.out | ||
#SBATCH -e CM_golden_muscat-%j.err | ||
INTERNAL_MODEL="golden_muscat" | ||
|
||
cd "/projects/bbym/shared/CDR_processing/pipeline_processing_003" | ||
|
||
# old URI | ||
# --amqp amqp://guest:guest@rabbitmqserver:5672/%2F \ | ||
|
||
CONTAINER_FILE="./criticalmaas-pipeline_latest.sif" | ||
|
||
if [ ! -e ${CONTAINER_FILE} ]; then | ||
echo "container file ${CONTAINER_FILE} does not exist! Exiting." | ||
exit | ||
# apptainer pull criticalmaas-pipeline.sif docker://ncsa/criticalmaas-pipeline:rabbitmq | ||
fi | ||
|
||
mkdir -p data output validation legends layouts logs feedback | ||
|
||
#export CDR_SYSTEM="UIUC" | ||
#export CDR_SYSTEM_VERSION="0.4.1" | ||
|
||
JOBS="" | ||
GPUS=$(nvidia-smi --list-gpus | wc -l) | ||
echo "gpu list: ${GPUS}" | ||
echo $GPUS | ||
echo "GPU list finished" | ||
|
||
#for GPU_NUM in $(seq 0 $(( $GPUS - 1 )) ); do | ||
# --layout /layouts \ | ||
# --validation /validation \ | ||
# echo "setting up GPU # ${GPU_NUM}" | ||
#export CUDA_VISIBLE_DEVICES=${GPU_NUM} | ||
apptainer run --nv \ | ||
-B ./data:/data \ | ||
-B ./output:/output \ | ||
-B ./legends:/legends \ | ||
-B ./layouts:/layouts \ | ||
-B ./feedback:/feedback \ | ||
-B ./logs:/logs \ | ||
-B /projects/bbym/shared/models/:/checkpoints \ | ||
${CONTAINER_FILE} \ | ||
-v \ | ||
--inactive_timeout 60 \ | ||
--data /data \ | ||
--output /output \ | ||
--legends /legends \ | ||
--layout /layouts \ | ||
--feedback /feedback \ | ||
--checkpoint_dir /checkpoints \ | ||
--output_types cdr_json raster_masks \ | ||
--log /logs/gpu-${SLURM_JOBID}.log \ | ||
--amqp amqp://ncsa:[email protected]:5672/%2F \ | ||
--model ${INTERNAL_MODEL} &> log.${SLURM_JOBID}_${HOSTNAME}.txt & | ||
# --model golden_muscat &> log.${SLURM_JOBID}_${HOSTNAME}_${GPU_NUM}.txt & | ||
# --gpu ${GPU_NUM} \ | ||
# --log /logs/gpu-${SLURM_JOBID}_${GPU_NUM}.log \ | ||
# --output_types cdr_json geopackage raster_masks \ | ||
|
||
# | ||
# --idle 60 \ | ||
JOB_ID="$!" | ||
JOBS="${JOBS} ${JOB_ID}" | ||
# echo "Started ${JOB_ID} on GPU ${GPU_NUM}" | ||
echo "Started ${JOB_ID}" | ||
#done | ||
|
||
for JOB in ${JOBS}; do | ||
echo "Waiting for $JOB" | ||
wait $JOB | ||
done |