From f76dc82c34b65870df8fdf7705740e6aad6feb84 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 6 Aug 2024 13:42:31 -0500 Subject: [PATCH 1/6] cluster utility script examples An example of job script from UIUC "Hydro" cluster, and example of python on-demand launcher that listens to the rabbitmq process queue and launches a job if there is anything to process. --- scripts/on_demand_Golden_Muscat_example.py | 203 +++++++++++++++++++++ scripts/on_demand_Golden_Muscat_job.bash | 79 ++++++++ 2 files changed, 282 insertions(+) create mode 100644 scripts/on_demand_Golden_Muscat_example.py create mode 100644 scripts/on_demand_Golden_Muscat_job.bash diff --git a/scripts/on_demand_Golden_Muscat_example.py b/scripts/on_demand_Golden_Muscat_example.py new file mode 100644 index 0000000..98d4272 --- /dev/null +++ b/scripts/on_demand_Golden_Muscat_example.py @@ -0,0 +1,203 @@ +import pika,sys,getopt +import os +import json +import subprocess +import time + + +#input_queue = "download" +#input_queue = "process_flat_iceberg" +input_queue = "process_golden_muscat" +#input_queue = "upload" +#output_queue = "send_to_processing" +#input_queue = "test_upload" +primary_batch_script="/projects/bbym/shared/CDR_processing/pipeline_processing_003/golden_muscat_launcher.bash" +#secondary_batch_scripts=["/projects/bbym/shared/CDR_processing/pipeline_processing_003/drab_volcano_launcher.bash"] + +global input_dir +input_dir="" +global output_dir +output_dir="" + +############### +# this is only here because the include isn't working + +RMQ_username = "criticalmaas" +RMQ_password = "keeNgoo1VahthuS4ii1r" + +# +############## + +def how_many_messages_in_queue(secrets_file,queue_name): + rabbitmq_uri=set_up_RMQ(secrets_file) + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + my_queue=channel.queue_declare(queue=queue_name,durable=True) + return_val=my_queue.method.message_count + connection.close() + return my_queue.method.message_count + + + + +""" +def launch_job_return_ID_special(batch_file): + global secondary_batch_scripts + # run for launching when it's the *first* job launched in a while + # This function also fires off a couple of secondary jobs that are + # expected to run for a short time but then will terminate, leaving + # the main ones running for this program to keep track of. + for aux_batch_file in secondary_batch_scripts: + print("running secondary script") + subprocess.run(['sbatch', aux_batch_file], stdout=subprocess.PIPE) + return launch_job_return_ID(batch_file) +""" +def launch_job_return_ID(batch_file): + sbatch_result=subprocess.run(['sbatch', batch_file], stdout=subprocess.PIPE) + sbatch_string=sbatch_result.stdout.decode('utf-8') + sbatch_output_list=sbatch_string.split() + # (hopefully) returns the jobid of the new job + return sbatch_output_list[3] + +def how_many_of_my_jobs_running(job_list): + job_total=0 + for check_job in job_list: + squeue_result=subprocess.run(['squeue'], stdout=subprocess.PIPE) + squeue_string=squeue_result.stdout.decode('utf-8') + sq_line_list=squeue_string.splitlines() + is_job_running=0 + for myline in sq_line_list: + segment_list = myline.split() + # print(f"first segment:{segment_list[0]}") + current_job = segment_list[0] + if check_job == current_job: + job_total += 1 + is_job_running=1 + #print(f" ****match!: {check_job} {current_job}") + #else: + #print(f"no match: {check_job} {current_job}") + # if the job isn't in the job list, then it's not running, so we can + # remove it from the job list + if is_job_running == 0: + job_list.remove(check_job) + print(f"removing job {check_job} from the job list") + return job_total + +def set_up_RMQ(secrets_file): + global RMQ_username + global RMQ_password + + if os.path.exists(secrets_file): + execfile(filename) + + # rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" + rabbitmq_uri = "amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F" + return rabbitmq_uri + +""" +def callback(ch, method, properties, body): + print("***Received:") + my_catalog=json.loads(body) + print("got catalog") + print(my_catalog) + # print("map name:>>"+my_catalog['map_name']+"<<") + """ + +""" + file_URL=my_catalog['cog_url'] + DL_command="cd "+input_dir+" ; wget "+file_URL + print("download command: "+DL_command) +""" + +""" + # print("about to run download command") + # os.system(DL_command) + # print("finished download command") + +# result_file=input_dir+my_catalog['cog_id']+".cog.tif" + print("input dir:"+input_dir) +# print("resulting file: "+result_file) + ch.basic_ack(delivery_tag=method.delivery_tag) + print('about to sleep for 1 seconds') + time.sleep(.2) + print('finished ack') +# sys.exit(2) +""" + +def main(argv): + # secrets file per-user + global input_queue + global primary_batch_script + + input_queue_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + print(f"Queue has {input_queue_count} items.") + + job_list=[] + # job_ID=launch_job_return_ID(primary_batch_script) + #print(f"launched job {job_ID}") + #job_list.append(job_ID) + + #print("about to sleep") + #time.sleep(2) + + #for i in range(180): + # my_job_count=how_many_of_my_jobs_running(job_list) + # print(f"There are {my_job_count} of our jobs running ({i}).") + # time.sleep(1) + + while True : + target_job_count = 0 + process_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + if process_count > 0: + target_job_count += 1 + if process_count > 10: + target_job_count += 1 + if process_count > 20: + target_job_count += 1 + if process_count > 30: + target_job_count += 2 + currently_running=how_many_of_my_jobs_running(job_list) + print(f"Status (prelaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + jobs_launched=0 + while currently_running < target_job_count : + job_ID=launch_job_return_ID(primary_batch_script) + print(f"Launching job {job_ID}") + job_list.append(job_ID) + currently_running=how_many_of_my_jobs_running(job_list) + jobs_launched += 1 + print(f"Status (postlaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + time.sleep(300) +# time.sleep(15) + + + print("exiting for debugging.") + sys.exit(2) + + + + + """ + rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=input_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=False) + """ + print("Input data directory:"+input_dir) + print("Output data directory:"+output_dir) + print("Now beginning consuming loop:") + + """ + channel.start_consuming() + """ + +if __name__ == '__main__': + main(sys.argv[1:]) +# main() + diff --git a/scripts/on_demand_Golden_Muscat_job.bash b/scripts/on_demand_Golden_Muscat_job.bash new file mode 100644 index 0000000..369afaf --- /dev/null +++ b/scripts/on_demand_Golden_Muscat_job.bash @@ -0,0 +1,79 @@ +#!/bin/bash + +#SBATCH -p a100 +#SBATCH -A bbym-hydro +#SBATCH --time 08:00:00 +#SBATCH --mail-type=BEGIN,END +#SBATCH --mail-user=craigsteffen@gmail.com +##SBATCH --cpus_per_task=56 +#SBATCH -o CM_golden_muscat-%j.out +#SBATCH -e CM_golden_muscat-%j.err +INTERNAL_MODEL="golden_muscat" + +cd "/projects/bbym/shared/CDR_processing/pipeline_processing_003" + +# old URI +# --amqp amqp://guest:guest@rabbitmqserver:5672/%2F \ + +CONTAINER_FILE="./criticalmaas-pipeline_latest.sif" + +if [ ! -e ${CONTAINER_FILE} ]; then + echo "container file ${CONTAINER_FILE} does not exist! Exiting." + exit + # apptainer pull criticalmaas-pipeline.sif docker://ncsa/criticalmaas-pipeline:rabbitmq +fi + +mkdir -p data output validation legends layouts logs feedback + +#export CDR_SYSTEM="UIUC" +#export CDR_SYSTEM_VERSION="0.4.1" + +JOBS="" +GPUS=$(nvidia-smi --list-gpus | wc -l) +echo "gpu list: ${GPUS}" +echo $GPUS +echo "GPU list finished" + +#for GPU_NUM in $(seq 0 $(( $GPUS - 1 )) ); do + # --layout /layouts \ + # --validation /validation \ +# echo "setting up GPU # ${GPU_NUM}" +#export CUDA_VISIBLE_DEVICES=${GPU_NUM} +apptainer run --nv \ + -B ./data:/data \ + -B ./output:/output \ + -B ./legends:/legends \ + -B ./layouts:/layouts \ + -B ./feedback:/feedback \ + -B ./logs:/logs \ + -B /projects/bbym/shared/models/:/checkpoints \ + ${CONTAINER_FILE} \ + -v \ + --inactive_timeout 60 \ + --data /data \ + --output /output \ + --legends /legends \ + --layout /layouts \ + --feedback /feedback \ + --checkpoint_dir /checkpoints \ + --output_types cdr_json raster_masks \ + --log /logs/gpu-${SLURM_JOBID}.log \ + --amqp amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F \ + --model ${INTERNAL_MODEL} &> log.${SLURM_JOBID}_${HOSTNAME}.txt & +# --model golden_muscat &> log.${SLURM_JOBID}_${HOSTNAME}_${GPU_NUM}.txt & +# --gpu ${GPU_NUM} \ +# --log /logs/gpu-${SLURM_JOBID}_${GPU_NUM}.log \ +# --output_types cdr_json geopackage raster_masks \ + +# +# --idle 60 \ +JOB_ID="$!" +JOBS="${JOBS} ${JOB_ID}" +# echo "Started ${JOB_ID} on GPU ${GPU_NUM}" +echo "Started ${JOB_ID}" +#done + +for JOB in ${JOBS}; do + echo "Waiting for $JOB" + wait $JOB +done From ab637e531968ddd06d87ee5f7b6576a5a029fecc Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Wed, 21 Aug 2024 08:15:33 -0500 Subject: [PATCH 2/6] goldenM and icyR launcher scripts 0th order production version (tested, with hard-coded paths from Hydro) of the on-demand launcher scripts for Golden Muscat and Icy Resin. --- scripts/manage_goldenM_00.py | 203 ++++++++++++++++++++++++++++++++++ scripts/manage_icyR_00.py | 204 +++++++++++++++++++++++++++++++++++ 2 files changed, 407 insertions(+) create mode 100644 scripts/manage_goldenM_00.py create mode 100644 scripts/manage_icyR_00.py diff --git a/scripts/manage_goldenM_00.py b/scripts/manage_goldenM_00.py new file mode 100644 index 0000000..e538f17 --- /dev/null +++ b/scripts/manage_goldenM_00.py @@ -0,0 +1,203 @@ +import pika,sys,getopt +import os +import json +import subprocess +import time + + +#input_queue = "download" +#input_queue = "process_flat_iceberg" +input_queue = "process_golden_muscat" +#input_queue = "upload" +#output_queue = "send_to_processing" +#input_queue = "test_upload" +primary_batch_script="/projects/bbym/shared/CDR_processing/pipeline_processing_003/golden_muscat_launcher.bash" +#secondary_batch_scripts=["/projects/bbym/shared/CDR_processing/pipeline_processing_003/drab_volcano_launcher.bash"] + +global input_dir +input_dir="" +global output_dir +output_dir="" + +############### +# this is only here because the include isn't working + +RMQ_username = "criticalmaas" +RMQ_password = "keeNgoo1VahthuS4ii1r" + +# +############## + +def how_many_messages_in_queue(secrets_file,queue_name): + rabbitmq_uri=set_up_RMQ(secrets_file) + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + my_queue=channel.queue_declare(queue=queue_name,durable=True) + return_val=my_queue.method.message_count + connection.close() + return my_queue.method.message_count + + + + +""" +def launch_job_return_ID_special(batch_file): + global secondary_batch_scripts + # run for launching when it's the *first* job launched in a while + # This function also fires off a couple of secondary jobs that are + # expected to run for a short time but then will terminate, leaving + # the main ones running for this program to keep track of. + for aux_batch_file in secondary_batch_scripts: + print("running secondary script") + subprocess.run(['sbatch', aux_batch_file], stdout=subprocess.PIPE) + return launch_job_return_ID(batch_file) +""" +def launch_job_return_ID(batch_file): + sbatch_result=subprocess.run(['sbatch', batch_file], stdout=subprocess.PIPE) + sbatch_string=sbatch_result.stdout.decode('utf-8') + sbatch_output_list=sbatch_string.split() + # (hopefully) returns the jobid of the new job + return sbatch_output_list[3] + +def how_many_of_my_jobs_running(job_list): + job_total=0 + for check_job in job_list: + squeue_result=subprocess.run(['squeue'], stdout=subprocess.PIPE) + squeue_string=squeue_result.stdout.decode('utf-8') + sq_line_list=squeue_string.splitlines() + is_job_running=0 + for myline in sq_line_list: + segment_list = myline.split() + # print(f"first segment:{segment_list[0]}") + current_job = segment_list[0] + if check_job == current_job: + job_total += 1 + is_job_running=1 + #print(f" ****match!: {check_job} {current_job}") + #else: + #print(f"no match: {check_job} {current_job}") + # if the job isn't in the job list, then it's not running, so we can + # remove it from the job list + if is_job_running == 0: + job_list.remove(check_job) + print(f"removing job {check_job} from the job list") + return job_total + +def set_up_RMQ(secrets_file): + global RMQ_username + global RMQ_password + + if os.path.exists(secrets_file): + execfile(filename) + + # rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" + rabbitmq_uri = "amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F" + return rabbitmq_uri + +""" +def callback(ch, method, properties, body): + print("***Received:") + my_catalog=json.loads(body) + print("got catalog") + print(my_catalog) + # print("map name:>>"+my_catalog['map_name']+"<<") + """ + +""" + file_URL=my_catalog['cog_url'] + DL_command="cd "+input_dir+" ; wget "+file_URL + print("download command: "+DL_command) +""" + +""" + # print("about to run download command") + # os.system(DL_command) + # print("finished download command") + +# result_file=input_dir+my_catalog['cog_id']+".cog.tif" + print("input dir:"+input_dir) +# print("resulting file: "+result_file) + ch.basic_ack(delivery_tag=method.delivery_tag) + print('about to sleep for 1 seconds') + time.sleep(.2) + print('finished ack') +# sys.exit(2) +""" + +def main(argv): + # secrets file per-user + global input_queue + global primary_batch_script + + input_queue_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + print(f"Queue has {input_queue_count} items.") + + job_list=[] + # job_ID=launch_job_return_ID(primary_batch_script) + #print(f"launched job {job_ID}") + #job_list.append(job_ID) + + #print("about to sleep") + #time.sleep(2) + + #for i in range(180): + # my_job_count=how_many_of_my_jobs_running(job_list) + # print(f"There are {my_job_count} of our jobs running ({i}).") + # time.sleep(1) + + while True : + target_job_count = 0 + process_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + if process_count > 0: + target_job_count += 1 + if process_count > 10: + target_job_count += 1 + if process_count > 20: + target_job_count += 1 + if process_count > 30: + target_job_count += 2 + currently_running=how_many_of_my_jobs_running(job_list) + print(f"Status (prelaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + jobs_launched=0 + while currently_running < target_job_count : + job_ID=launch_job_return_ID(primary_batch_script) + print(f"Launching job {job_ID}") + job_list.append(job_ID) + currently_running=how_many_of_my_jobs_running(job_list) + jobs_launched += 1 + print(f"Status (postlaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + time.sleep(60) +# time.sleep(15) + + + print("exiting for debugging.") + sys.exit(2) + + + + + """ + rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=input_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=False) + """ + print("Input data directory:"+input_dir) + print("Output data directory:"+output_dir) + print("Now beginning consuming loop:") + + """ + channel.start_consuming() + """ + +if __name__ == '__main__': + main(sys.argv[1:]) +# main() + diff --git a/scripts/manage_icyR_00.py b/scripts/manage_icyR_00.py new file mode 100644 index 0000000..d141788 --- /dev/null +++ b/scripts/manage_icyR_00.py @@ -0,0 +1,204 @@ +import pika,sys,getopt +import os +import json +import subprocess +import time + + +#input_queue = "download" +#input_queue = "process_flat_iceberg" +#input_queue = "process_golden_muscat" +input_queue = "process_icy_resin" +#input_queue = "upload" +#output_queue = "send_to_processing" +#input_queue = "test_upload" +primary_batch_script="/projects/bbym/shared/CDR_processing/pipeline_processing_003/icy_resin_launcher.bash" +#secondary_batch_scripts=["/projects/bbym/shared/CDR_processing/pipeline_processing_003/drab_volcano_launcher.bash"] + +global input_dir +input_dir="" +global output_dir +output_dir="" + +############### +# this is only here because the include isn't working + +RMQ_username = "criticalmaas" +RMQ_password = "keeNgoo1VahthuS4ii1r" + +# +############## + +def how_many_messages_in_queue(secrets_file,queue_name): + rabbitmq_uri=set_up_RMQ(secrets_file) + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + my_queue=channel.queue_declare(queue=queue_name,durable=True) + return_val=my_queue.method.message_count + connection.close() + return my_queue.method.message_count + + + + +""" +def launch_job_return_ID_special(batch_file): + global secondary_batch_scripts + # run for launching when it's the *first* job launched in a while + # This function also fires off a couple of secondary jobs that are + # expected to run for a short time but then will terminate, leaving + # the main ones running for this program to keep track of. + for aux_batch_file in secondary_batch_scripts: + print("running secondary script") + subprocess.run(['sbatch', aux_batch_file], stdout=subprocess.PIPE) + return launch_job_return_ID(batch_file) +""" +def launch_job_return_ID(batch_file): + sbatch_result=subprocess.run(['sbatch', batch_file], stdout=subprocess.PIPE) + sbatch_string=sbatch_result.stdout.decode('utf-8') + sbatch_output_list=sbatch_string.split() + # (hopefully) returns the jobid of the new job + return sbatch_output_list[3] + +def how_many_of_my_jobs_running(job_list): + job_total=0 + for check_job in job_list: + squeue_result=subprocess.run(['squeue'], stdout=subprocess.PIPE) + squeue_string=squeue_result.stdout.decode('utf-8') + sq_line_list=squeue_string.splitlines() + is_job_running=0 + for myline in sq_line_list: + segment_list = myline.split() + # print(f"first segment:{segment_list[0]}") + current_job = segment_list[0] + if check_job == current_job: + job_total += 1 + is_job_running=1 + #print(f" ****match!: {check_job} {current_job}") + #else: + #print(f"no match: {check_job} {current_job}") + # if the job isn't in the job list, then it's not running, so we can + # remove it from the job list + if is_job_running == 0: + job_list.remove(check_job) + print(f"removing job {check_job} from the job list") + return job_total + +def set_up_RMQ(secrets_file): + global RMQ_username + global RMQ_password + + if os.path.exists(secrets_file): + execfile(filename) + + # rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" + rabbitmq_uri = "amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F" + return rabbitmq_uri + +""" +def callback(ch, method, properties, body): + print("***Received:") + my_catalog=json.loads(body) + print("got catalog") + print(my_catalog) + # print("map name:>>"+my_catalog['map_name']+"<<") + """ + +""" + file_URL=my_catalog['cog_url'] + DL_command="cd "+input_dir+" ; wget "+file_URL + print("download command: "+DL_command) +""" + +""" + # print("about to run download command") + # os.system(DL_command) + # print("finished download command") + +# result_file=input_dir+my_catalog['cog_id']+".cog.tif" + print("input dir:"+input_dir) +# print("resulting file: "+result_file) + ch.basic_ack(delivery_tag=method.delivery_tag) + print('about to sleep for 1 seconds') + time.sleep(.2) + print('finished ack') +# sys.exit(2) +""" + +def main(argv): + # secrets file per-user + global input_queue + global primary_batch_script + + input_queue_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + print(f"Queue has {input_queue_count} items.") + + job_list=[] + # job_ID=launch_job_return_ID(primary_batch_script) + #print(f"launched job {job_ID}") + #job_list.append(job_ID) + + #print("about to sleep") + #time.sleep(2) + + #for i in range(180): + # my_job_count=how_many_of_my_jobs_running(job_list) + # print(f"There are {my_job_count} of our jobs running ({i}).") + # time.sleep(1) + + while True : + target_job_count = 0 + process_count=how_many_messages_in_queue("~/.criticalmaas/secrets",input_queue) + if process_count > 0: + target_job_count += 1 + if process_count > 10: + target_job_count += 1 + if process_count > 20: + target_job_count += 1 + if process_count > 30: + target_job_count += 2 + currently_running=how_many_of_my_jobs_running(job_list) + print(f"Status (prelaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + jobs_launched=0 + while currently_running < target_job_count : + job_ID=launch_job_return_ID(primary_batch_script) + print(f"Launching job {job_ID}") + job_list.append(job_ID) + currently_running=how_many_of_my_jobs_running(job_list) + jobs_launched += 1 + print(f"Status (postlaunch): downloads to process={process_count}, current jobs running={currently_running}, target_jobs={target_job_count}") + time.sleep(60) +# time.sleep(15) + + + print("exiting for debugging.") + sys.exit(2) + + + + + """ + rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=input_queue, durable=True) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=False) + """ + print("Input data directory:"+input_dir) + print("Output data directory:"+output_dir) + print("Now beginning consuming loop:") + + """ + channel.start_consuming() + """ + +if __name__ == '__main__': + main(sys.argv[1:]) +# main() + From 00b53133b9fcb6b5794f00d1b65b1218afdfabed Mon Sep 17 00:00:00 2001 From: Rob Kooper Date: Thu, 22 Aug 2024 16:32:59 -0400 Subject: [PATCH 3/6] added 2 scripts to launch most pieces --- scripts/README.md | 8 ++++ scripts/download_upload.sh | 82 ++++++++++++++++++++++++++++++++++++++ scripts/model_launcher.sh | 36 +++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 scripts/README.md create mode 100644 scripts/download_upload.sh create mode 100644 scripts/model_launcher.sh diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..d7695f9 --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,8 @@ +# Utility scripts + +This folder contains utility scripts that are used on the HPC to monitor the queues and start models on the HPC. + +*upload_download.sh* - This script will download and start 2 singulatiry containers (if not already started) and tail the output logs. +*model_launcher.sh* - This script will check if any models are needed to be run and start them. This will launch about 1 pipeline for every 10 waiting jobs. + +Both scripts require a files called secrets.sh to be in the same folder. If it does not exist, it will print a message if it does not exist. diff --git a/scripts/download_upload.sh b/scripts/download_upload.sh new file mode 100644 index 0000000..970a80b --- /dev/null +++ b/scripts/download_upload.sh @@ -0,0 +1,82 @@ +#!/bin/bash + +if [ ! -e secrets.sh ]; then + cat < 5 ? 5 : ${NEEDED} )) + if [ $RUNNING -lt $NEEDED ]; then + echo -en "${SKIP}Starting another pipeline for $queue. " + SKIP="" + sbatch --job-name ${queue} "/projects/bbym/shared/CDR_processing/pipeline_processing_003/${queue}_launcher.bash" + fi + LOG="${LOG} [$queue : Running=$RUNNING jobs=$JOBS need=$NEEDED] " + done + echo -ne "${LOG} \r" + sleep 1 +done From c0e3b9192610211584a43d1127506220e23cda12 Mon Sep 17 00:00:00 2001 From: Rob Kooper Date: Sun, 25 Aug 2024 21:18:30 -0500 Subject: [PATCH 4/6] use apptainer, not singularity --- scripts/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/README.md b/scripts/README.md index d7695f9..d5851f0 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -2,7 +2,7 @@ This folder contains utility scripts that are used on the HPC to monitor the queues and start models on the HPC. -*upload_download.sh* - This script will download and start 2 singulatiry containers (if not already started) and tail the output logs. +*upload_download.sh* - This script will download and start 2 apptainer containers (if not already started) and tail the output logs. *model_launcher.sh* - This script will check if any models are needed to be run and start them. This will launch about 1 pipeline for every 10 waiting jobs. Both scripts require a files called secrets.sh to be in the same folder. If it does not exist, it will print a message if it does not exist. From 705d63efbdf1f8b9cf9f917893c1101b5b5d6cf7 Mon Sep 17 00:00:00 2001 From: abodeuis <54557445+abodeuis@users.noreply.github.com> Date: Thu, 3 Oct 2024 09:16:49 -0500 Subject: [PATCH 5/6] Small grammar correction in README --- scripts/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/README.md b/scripts/README.md index d5851f0..616a7c3 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -1,6 +1,6 @@ # Utility scripts -This folder contains utility scripts that are used on the HPC to monitor the queues and start models on the HPC. +This folder contains utility scripts that are used on the HPC system to monitor queues and start models. *upload_download.sh* - This script will download and start 2 apptainer containers (if not already started) and tail the output logs. *model_launcher.sh* - This script will check if any models are needed to be run and start them. This will launch about 1 pipeline for every 10 waiting jobs. From abd2aecb23d23e5cad30aaa05b16778ee8be43d8 Mon Sep 17 00:00:00 2001 From: Aaron Saxton Date: Fri, 4 Oct 2024 13:54:22 -0500 Subject: [PATCH 6/6] Elaborating on secrets.sh description in README --- scripts/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/README.md b/scripts/README.md index 616a7c3..8c86568 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -5,4 +5,10 @@ This folder contains utility scripts that are used on the HPC system to monitor *upload_download.sh* - This script will download and start 2 apptainer containers (if not already started) and tail the output logs. *model_launcher.sh* - This script will check if any models are needed to be run and start them. This will launch about 1 pipeline for every 10 waiting jobs. -Both scripts require a files called secrets.sh to be in the same folder. If it does not exist, it will print a message if it does not exist. +Both scripts require a files called secrets.sh to be in the same folder. If it does not exist, it will print a message that states the required variables needed in secrets.sh. An example of the contents of secrets.sh is, +``` +# required variables +export CDR_TOKEN=this_is_a_secret_received_from_cdr +export RABBITMQ_URI=amqp://username:password@server.url:5672/%2F +export MONITOR_URL=https://server.url/monitor/queues.json +```