From ea1ca90b67e8f26f7fca9a76a8461e665b292532 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 16 Jul 2024 13:06:44 -0500 Subject: [PATCH 01/27] prelim (hard coded) downloader Pre-containerization python downloader code. Everything hard-coded, this will be converted to a containerized version. --- downloader/CM_B_downloader.py | 259 ++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100755 downloader/CM_B_downloader.py diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py new file mode 100755 index 0000000..85de361 --- /dev/null +++ b/downloader/CM_B_downloader.py @@ -0,0 +1,259 @@ +import pika, sys, getopt, os +import json +import time +#import argparse + +# configuration options; should come from environment variables or something +# this is the base of the working directory in the main (non-image) parallel +# file system +my_data_dir="/projects/bbym/shared/CDR_processing/pipeline_processing_003" + +print("") +print("CriticaMAAS B-stage downloader") + +download_queue="download" + +# name preamble for processing queues, to make it easier for humans to see the +# function of the processing queues +process_queue_base = "process_" + +# current list of models. Should come from an environment variable +#process_model_list = ["golden_muscat","flat_iceberg"] +process_model_list = ["golden_muscat","flat_iceberg","drab_volcano"] + +# this is the actual running list of process queues that we will output +# requests to as we receive requests from CDR +#process_queue_list=[] + +#rabbitmq_uri = "amqp://criticalmaas:keeNgoo1VahthuS4ii1r@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" +rabbitmq_uri = "amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F" + +############### +# this is only here because the include isn't working + +RMQ_username = "criticalmaas" +RMQ_password = "keeNgoo1VahthuS4ii1r" + +# +############## + +def set_up_RMQ(secrets_file): + global rabbitmq_uri + # global RMQ_username +# global RMQ_password + +# if os.path.exists(secrets_file): +# execfile(filename) +# rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" + return rabbitmq_uri + + +def CDR_download_callback(ch, method, properties, body): +# global my_log_dir +# global my_input_dir +# global my_output_dir + global my_data_dir + + global rabbitmq_uri + + global process_model_list + + my_relative_filename="" + + print("***Received:") + # catalog from CDR + CDR_catalog=json.loads(body) + print("got CDR catalog") + # print("map name:>>"+my_catalog['map_name']+"<<") + + print("about to print catalog") + print(CDR_catalog) + print("finished catalog") + + # ack here so the request gets removed from the stack before + # downloading; downloading can be minutes + ch.basic_ack(delivery_tag=method.delivery_tag) + # (FYI: putting the ack here doesn't help the timeout. We will still be suseptible + # to the heartbeat timeout if the download time exceeds 60 seconds + + # download the file first + # (perhaps to be replaced by more python-y alternative from Rob) + tif_file_URL=CDR_catalog['cog_url'] + maparea_file_URL=CDR_catalog['map_area'] + CDR_model_list=CDR_catalog['models'] + my_cog_id=CDR_catalog['cog_id'] + + # we split the target into intermediate directories to avoid pileup of tons + # of entities in a single directory; this is for scaling to a parallel + # filesystem with many man requests. + split_path=os.path.join(my_cog_id[0:2],my_cog_id[2:4]) + extended_split_path=os.path.join(split_path,my_cog_id) + # this is where the data directory is mounted inside the image + external_data_base_dir_relative="data" + image_data_base_dir_absolute="/"+external_data_base_dir_relative + + # the total path in the image where the data is reference from + # image_data_path=os.path.join(image_data_base_dir_absolute,extended_split_path) + external_data_path=os.path.join(external_data_base_dir_relative,extended_split_path) + tif_data_file_name=my_cog_id+".cog.tif" + maparea_data_file_name=my_cog_id+".cog_area.json" + + # this is the location of the data file within the container, relative to its canonical folder (probably "/data") + # tif_filename_with_path=os.path.join(image_data_path,tif_data_file_name) + tif_filename_with_path=os.path.join(extended_split_path,tif_data_file_name) + maparea_filename_with_path=os.path.join(extended_split_path,maparea_data_file_name) + + # external_data_filename_with_path=os.path.join(external_data_path,tif_data_file_name) + + if maparea_file_URL: + # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL + print("about to run maparea download command: "+DL_command) + os.system(DL_command) + print("finished maparea download command") + + if tif_file_URL: + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL + # check for file before downloading + fetch_file_path=os.path.join(my_data_dir,external_data_path); + fetch_file_components=tif_file_URL.split("/") + fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1]) + if os.path.isfile(fetch_file_total_path): + print(f"File >{fetch_file_total_path}< already exists! Skipping download.") + else: + print("about to run tif download command: "+DL_command) + os.system(DL_command) + print("finished tif download command") + + # set up message; the message is only specific to the request file, not to + # the model, so we can set up a message to be sent to all of the processing + # queues. + + # construct outgoing request catalog from incoming CDR catalog + # outgoing_message_dictionary={'request_type': "input_file" , 'input_file': CDR_catalog['cog_url'] , 'input_dir': my_input_dir, 'output_dir': my_output_dir, 'model': model_to_process, 'pipeline_image': my_pipeline_image, 'log_dir': my_log_dir} + # outgoing_message_dictionary={'request_type': "input_file" , 'input_file': CDR_catalog['cog_url'] , 'input_dir': my_input_dir, 'output_dir': my_output_dir, 'model': model_to_process, 'log_dir': my_log_dir, 'cog_id': CDR_catalog['cog_id'], 'metadata': CDR_catalog['metadata'], 'results': CDR_catalog['results'] } + + # Pass whole catalog on, with additions to make the inference pipeline work + outgoing_message_dictionary=CDR_catalog; + outgoing_message_dictionary["image_filename"]=tif_filename_with_path + outgoing_message_dictionary["json_filename"]=maparea_filename_with_path + + # then send out processing requests + # for model_to_process in process_model_list: + for model_to_process in CDR_model_list: + my_process_queue = process_queue_base+model_to_process + + outgoing_message=json.dumps(outgoing_message_dictionary) + print("About to send process message:") + print(outgoing_message) + print("finished process message") + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=my_process_queue, durable=True) + properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) + channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=properties) + + # exit after a single message processing for testing +# sys.exit(2) + print("pausing") + time.sleep(2) + + + + # result_file=input_dir+my_catalog['map_id']+".cog.tif" + # print("input dir:"+input_dir) + # print("resulting file: "+result_file) + + + + +def main(argv): + my_input_file="" + my_input_dir="" + my_output_dir="" + my_model_name="" + + global my_log_dir + global my_pipeline_image + global download_queue + + global process_queue_list + global process_model_list + global process_queue_base + +# queue=queue_base+"_"+my_model_name + + print("input file:>"+my_input_file+"<") + print("output directory:>"+my_output_dir+"<") + #print("using queue:>"+queue+"<") + +# for each model_name in process_model_list: +# process_queue_list.append(process_queue_base+model_name) + +# print("total processing queue list:") +# print(process_queue_list) +# print("finished printing processing queue list.") + + # set up consumer + rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") + + parameters = pika.URLParameters(rabbitmq_uri) + print('About to open rabbitMQ connection') + connection = pika.BlockingConnection(parameters) + print('RabbitMQ connection succeeded!') + channel = connection.channel() + channel.queue_declare(queue=download_queue, durable=True) + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=download_queue, on_message_callback=CDR_download_callback, auto_ack=False) + # presumably this funtion takes us into a wait-process loop forever + print('start consumer loop') + channel.start_consuming() + + # my_catalog=json.load(jfile) + + + # my_request_dictionary={'input': my_input , 'output': my_output} + + # my_message = json.dumps(my_request_dictionary) + + # print("extract from catalog: map_name="+my_catalog[0]["map_name"]) + + # shouldn't get here, but just in case + sys.exit(2) + + my_outgoing_message_dictionary={'map_name': my_catalog[catalog_line]['map_name'] , 'map_id': my_catalog[catalog_line]['map_id'], 'cog_url': my_catalog[catalog_line]['cog_url']} + + + if len(my_input_file) > 0: + print("creating a single-file dictionary") + my_message_dictionary={'request_type': "input_file" , 'input_file': my_input_file , 'input_dir': my_input_dir, 'output_dir': my_output_dir, 'model': my_model_name, 'pipeline_image': my_pipeline_image, 'log_dir': my_log_dir} + else: + print("creating a directory (multi-file) dictionary") + my_message_dictionary={'request_type': "input_dir" , 'input_dir': my_input_dir , 'output_dir': my_output_dir, 'model': my_model_name, 'pipeline_image': my_pipeline_image, 'log_dir': my_log_dir} + + my_message = json.dumps(my_message_dictionary) + + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=True) + + properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) +## message = "hello world" + channel.basic_publish(exchange='', routing_key=queue, body=my_message, properties=properties) + +# here's the thing we're feeding (from 2024 April 10th): +# apptainer run --nv -B ./logs:/logs -B /projects/bbym/saxton/MockValData/:/data -B ./output:/output /projects/bbym/shared/continerExchange/criticalmaas-pipeline_latest.sif -v --log /logs/logs.latest --data /data/theFile.tif --legends /data/theFile.json +# added: --model flat_iceberg +# +# apptainer run --nv -B ./logs:/logs -B /projects/bbym/saxton/MockValData/:/data -B ./output:/output /projects/bbym + + + + print("finished producer main()") + + +if __name__ == '__main__': + main(sys.argv[1:]) + From afa0093153ddae47d589b20987c04b8bdd199568 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 16 Jul 2024 13:21:21 -0500 Subject: [PATCH 02/27] dockerfile and requirementt (not tested) Prelim dockerfile and requirements for build testing; not tested at all, just saved to transfer from machine to machine. --- downloader/Dockerfile | 16 ++++++++++++++++ downloader/requirements.txt | 2 ++ 2 files changed, 18 insertions(+) create mode 100644 downloader/Dockerfile create mode 100644 downloader/requirements.txt diff --git a/downloader/Dockerfile b/downloader/Dockerfile new file mode 100644 index 0000000..9a7543b --- /dev/null +++ b/downloader/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11 + +# environemnt variables +ENV RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \ + PREFIX="" \ + CDR_TOKEN="" \ + MAX_SIZE=300 + +WORKDIR /src + +COPY requirements.txt /src/ +RUN pip install -r /src/requirements.txt + +COPY . /src/ + +CMD python CM_B_downloader.py diff --git a/downloader/requirements.txt b/downloader/requirements.txt new file mode 100644 index 0000000..2a311e0 --- /dev/null +++ b/downloader/requirements.txt @@ -0,0 +1,2 @@ +requests +pika From a5ab899c8249e0a2cadad3a52972ce69614915d6 Mon Sep 17 00:00:00 2001 From: Craig Steffen Date: Thu, 18 Jul 2024 09:50:55 -0500 Subject: [PATCH 03/27] containerize: work from /data Now works from /data instead of from hard-coded directory. The assumption is that /data is an externally mounted-and-managed directory. --- downloader/CM_B_downloader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 85de361..8ab1474 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -6,7 +6,9 @@ # configuration options; should come from environment variables or something # this is the base of the working directory in the main (non-image) parallel # file system -my_data_dir="/projects/bbym/shared/CDR_processing/pipeline_processing_003" + +#my_data_dir="/projects/bbym/shared/CDR_processing/pipeline_processing_003" +my_data_dir="/data" print("") print("CriticaMAAS B-stage downloader") @@ -79,7 +81,7 @@ def CDR_download_callback(ch, method, properties, body): # download the file first # (perhaps to be replaced by more python-y alternative from Rob) tif_file_URL=CDR_catalog['cog_url'] - maparea_file_URL=CDR_catalog['map_area'] + maparea_file_URL=CDR_catalog['map_data'] CDR_model_list=CDR_catalog['models'] my_cog_id=CDR_catalog['cog_id'] From 5fb48dcaff2291e4f39aefaad2c4e6d58afca011 Mon Sep 17 00:00:00 2001 From: Craig Steffen Date: Thu, 18 Jul 2024 09:53:08 -0500 Subject: [PATCH 04/27] adding downloader Add downloader to build checks. Untested as yet. --- .github/workflows/pylint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 2471d64..c53917b 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -12,7 +12,7 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11"] - lint-directories: ["./cdrhook", "./tests", "./uploader", "./monitor"] + lint-directories: ["./cdrhook", "./tests", "./uploader", "./monitor", "./downloader"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} From 37fb1e2296ce24a2689a2b2b115cb39884d99d17 Mon Sep 17 00:00:00 2001 From: Craig Steffen Date: Thu, 18 Jul 2024 09:54:29 -0500 Subject: [PATCH 05/27] adding /data to volumes Tieing in /data directory to the build. --- downloader/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/downloader/Dockerfile b/downloader/Dockerfile index 9a7543b..ce83cdc 100644 --- a/downloader/Dockerfile +++ b/downloader/Dockerfile @@ -7,6 +7,7 @@ ENV RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \ MAX_SIZE=300 WORKDIR /src +VOLUME /data COPY requirements.txt /src/ RUN pip install -r /src/requirements.txt From c98e38f6b0adc8e03700e5a993e655357807f458 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Fri, 19 Jul 2024 10:14:27 -0500 Subject: [PATCH 06/27] docker.yml update for downloader Adding entries for downloader in parallel with the other containered components. --- .github/workflows/docker.yml | 7 ++++++- downloader/CM_B_downloader.py | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 6089114..6e251f2 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -21,7 +21,8 @@ jobs: - cdrhook - uploader - monitor - include: + - downloader + include: - name: cdrhook FOLDER: cdrhook #PLATFORM: "linux/amd64,linux/arm64" @@ -35,6 +36,10 @@ jobs: FOLDER: monitor PLATFORM: "linux/amd64,linux/arm64" IMAGE: criticalmaas-monitor + - name: downloader + FOLDER: downloader + PLATFORM: "linux/amd64,linux/arm64" + IMAGE: criticalmaas-downloader steps: - uses: actions/checkout@v4 diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 8ab1474..c7b3ebd 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -8,7 +8,21 @@ # file system #my_data_dir="/projects/bbym/shared/CDR_processing/pipeline_processing_003" -my_data_dir="/data" +try: + my_local_dir=os.environ['CMAAS_LOCAL_DATA_DIR'] + my_data_dir=my_local_dir +except KeyError: + my_data_dir="/data" + +print(f'Data dir is {my_data_dir}') + +try: + test_file_filename=os.environ['CMAAS_LOCAL_DIR_TEST_FILE'] + dir_test_file=os.path.join(my_data_dir,test_file_filename) + open(dir_test_file,'a') +except KeyError: + a='2' + print("") print("CriticaMAAS B-stage downloader") From 8f0ff95768e0125eb7e5f6e6cf9ce561dd6f9518 Mon Sep 17 00:00:00 2001 From: asaxton Date: Mon, 22 Jul 2024 12:49:52 -0500 Subject: [PATCH 07/27] adjusted indention --- .github/workflows/docker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 6e251f2..80fd74c 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -21,7 +21,7 @@ jobs: - cdrhook - uploader - monitor - - downloader + - downloader include: - name: cdrhook FOLDER: cdrhook From d4a8790b07a5de4aed0e558ae7af4911b9d50c24 Mon Sep 17 00:00:00 2001 From: asaxton Date: Mon, 22 Jul 2024 12:55:40 -0500 Subject: [PATCH 08/27] another adjusted indention --- .github/workflows/docker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 80fd74c..e6231f9 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -36,7 +36,7 @@ jobs: FOLDER: monitor PLATFORM: "linux/amd64,linux/arm64" IMAGE: criticalmaas-monitor - - name: downloader + - name: downloader FOLDER: downloader PLATFORM: "linux/amd64,linux/arm64" IMAGE: criticalmaas-downloader From ac6ed7f6a1c7745eeec9c7055e4046893c56a59e Mon Sep 17 00:00:00 2001 From: asaxton Date: Mon, 22 Jul 2024 15:55:53 -0500 Subject: [PATCH 09/27] yet another adjusted indention --- .github/workflows/docker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index e6231f9..0acbcdc 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -22,7 +22,7 @@ jobs: - uploader - monitor - downloader - include: + include: - name: cdrhook FOLDER: cdrhook #PLATFORM: "linux/amd64,linux/arm64" From 15a9a9affa63c6a0cdae65f9867563958dde69f4 Mon Sep 17 00:00:00 2001 From: asaxton Date: Mon, 22 Jul 2024 15:59:01 -0500 Subject: [PATCH 10/27] hopfully the last adjusted indention --- .github/workflows/docker.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 0acbcdc..9135feb 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -37,9 +37,9 @@ jobs: PLATFORM: "linux/amd64,linux/arm64" IMAGE: criticalmaas-monitor - name: downloader - FOLDER: downloader - PLATFORM: "linux/amd64,linux/arm64" - IMAGE: criticalmaas-downloader + FOLDER: downloader + PLATFORM: "linux/amd64,linux/arm64" + IMAGE: criticalmaas-downloader steps: - uses: actions/checkout@v4 From 3c2b7f57bc879d120a4483953bcbddfb2626dc74 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 6 Aug 2024 13:30:11 -0500 Subject: [PATCH 11/27] map_data.json filename fix Quick fix; statically fixes the name of the json file. Doesn't fix the real logic of extracting the name from the incoming packet (which would future-proof this). --- downloader/CM_B_downloader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index c7b3ebd..c6b0773 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -112,7 +112,8 @@ def CDR_download_callback(ch, method, properties, body): # image_data_path=os.path.join(image_data_base_dir_absolute,extended_split_path) external_data_path=os.path.join(external_data_base_dir_relative,extended_split_path) tif_data_file_name=my_cog_id+".cog.tif" - maparea_data_file_name=my_cog_id+".cog_area.json" +# maparea_data_file_name=my_cog_id+".cog_area.json" + maparea_data_file_name=my_cog_id+".map_data.json" # this is the location of the data file within the container, relative to its canonical folder (probably "/data") # tif_filename_with_path=os.path.join(image_data_path,tif_data_file_name) From fd350280906e6f3a86499c23f161edb12ad4230a Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 13 Aug 2024 13:29:49 -0500 Subject: [PATCH 12/27] downloader working local; tested Downloader working and tested in local mode. Seems to interact correctly with rabbitmq. --- downloader/CM_B_downloader.py | 169 +++++++++++++++++++++++++++++++--- 1 file changed, 158 insertions(+), 11 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index c6b0773..ec7ca37 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -2,6 +2,9 @@ import json import time #import argparse +import threading +from requests.exceptions import RequestException +import logging # configuration options; should come from environment variables or something # this is the base of the working directory in the main (non-image) parallel @@ -27,7 +30,8 @@ print("") print("CriticaMAAS B-stage downloader") -download_queue="download" +download_main_queue="download" +download_error_queue="download.error" # name preamble for processing queues, to make it easier for humans to see the # function of the processing queues @@ -63,7 +67,99 @@ def set_up_RMQ(secrets_file): # rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" return rabbitmq_uri - +# the downloader worker class +class DL_worker(threading.Thread): + output_model_list = [] + output_message_dictionary = {} + + def process(self, method, properties, body_in): + self.method = method + self.properties = properties + self.body_in = body_in + self.exception = None + + def run(self): + try: + CDR_catalog = json.loads(self.body_in) + print("about to print catalog") + print(CDR_catalog) + print("finished catalog") + + # I don't think I need to, or even can, do this. I don't think + # I have the right kind of handle + # ch.basic_ack(delivery_tag=method.delivery_tag) + + + # grabbing relevant portions of the incoming message. + tif_file_URL=CDR_catalog['cog_url'] + maparea_file_URL=CDR_catalog['map_data'] + CDR_model_list=CDR_catalog['models'] + my_cog_id=CDR_catalog['cog_id'] + + # figure out filenames and paths for everything + + # we split the target into intermediate directories to avoid pileup of tons + # of entities in a single directory; this is for scaling to a parallel + # filesystem with many man requests. + split_path=os.path.join(my_cog_id[0:2],my_cog_id[2:4]) + relative_file_location=os.path.join(split_path,my_cog_id) + # this is where the data directory is mounted inside the image + data_location_base="data" +# image_base_dir_absolute="/"+data_location_relative + + # the total path in the image where the data is reference from + external_data_path=os.path.join(data_location_base,relative_file_location) + tif_data_file_name=my_cog_id+".cog.tif" + # maparea_data_file_name=my_cog_id+".cog_area.json" + maparea_data_file_name=my_cog_id+".map_data.json" + + # here we have the actual paths for the files we're going to (potentially) download + tif_filename_with_path=os.path.join(external_data_path,tif_data_file_name) + maparea_filename_with_path=os.path.join(external_data_path,maparea_data_file_name) + + # if the incoming message specified a maparea file, get it + if maparea_file_URL: + # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL + print("about to run maparea download command: "+DL_command) + os.system(DL_command) + #time.sleep(2) + print("finished maparea download command") + + # if the incoming message specified an image file, get it + if tif_file_URL: + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL + # check for file before downloading + fetch_file_path=os.path.join(my_data_dir,external_data_path); + fetch_file_components=tif_file_URL.split("/") + fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1]) + if os.path.isfile(fetch_file_total_path): + print(f"File >{fetch_file_total_path}< already exists! Skipping download.") + else: + print("about to run tif download command: "+DL_command) + os.system(DL_command) + #time.sleep(5) + print("finished tif download command") + + # construct and send processing orders based on the incoming message, and + # what we downloaded. For completeness, we also include the entire incoming + # catalog for easier tracking + outgoing_message_dictionary=CDR_catalog; + outgoing_message_dictionary["image_filename"]=tif_filename_with_path + outgoing_message_dictionary["json_filename"]=maparea_filename_with_path + self.output_message_dictionary=outgoing_message_dictionary + for model_to_process in CDR_model_list: + self.output_model_list.append(model_to_process) + + except RequestException as e: + logging.exception(f"Request Error {response.text}.") + self.exception = e + except Exception as e: + logging.exception("Error processing pipeline request.") + self.exception = e + + + def CDR_download_callback(ch, method, properties, body): # global my_log_dir # global my_input_dir @@ -203,15 +299,7 @@ def main(argv): print("input file:>"+my_input_file+"<") print("output directory:>"+my_output_dir+"<") - #print("using queue:>"+queue+"<") - -# for each model_name in process_model_list: -# process_queue_list.append(process_queue_base+model_name) -# print("total processing queue list:") -# print(process_queue_list) -# print("finished printing processing queue list.") - # set up consumer rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") @@ -220,8 +308,67 @@ def main(argv): connection = pika.BlockingConnection(parameters) print('RabbitMQ connection succeeded!') channel = connection.channel() - channel.queue_declare(queue=download_queue, durable=True) + + # from this point, following the template from the uploader + channel.queue_declare(queue=download_main_queue, durable=True) + channel.queue_declare(queue=download_error_queue, durable=True) + # We only know for sure our incoming download queue and its error queue. + # Queues for models are defined according to incoming messages, so we + # don't pre-declare them here. + + # Defining prefetch count here to be 1 means that if we're processing something but + # we haven't closed it out, and we asked for another, we get an empty response. channel.basic_qos(prefetch_count=1) + + # here's where the flow bifurcase from my oringinal non-threaded downloader + + # consumer iterator to pull messages off the download queue + consumer = channel.consume(queue=download_main_queue, inactivity_timeout=1) + + worker = None + while True: + # grab the next message from the download queue + # We only get a real message when there is a message waiting and we've + # closed out the last one + method, properties, body = next(consumer) + if method: + # if there was a new message with real information in it, + # we fire off a worker with its contents + worker = DL_worker() + worker.process(method, properties, body) + worker.start() + + if worker: + if not worker.is_alive(): + # if the worker has finished its task, we grab its contents, send out + # its messages, and then close it out to the queue iterator + my_model_list = worker.output_model_list + my_output_dictionary = worker.output_message_dictionary + + if worker.exception: + error_data['exception'] = repr(worker.exception) + channel.basic_publish(exchange='', routing_key=download_error_queue, body=json.dumps(error_data), properties=worker.properties) + else: + # send a process message for each model in the list + for model_to_process in my_model_list: + print(f"sending process request for model {model_to_process}") + my_process_queue = process_queue_base+model_to_process + outgoing_message=json.dumps(my_output_dictionary) + parameters = pika.URLParameters(rabbitmq_uri) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=my_process_queue, durable=True) + properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) + channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=properties) + channel.basic_ack(delivery_tag=worker.method.delivery_tag) + worker = None + + + print ("should never get here! Exiting!") + sys.exit(2) + + channel.queue_declare(queue=download_queue, durable=True) + channel.basic_consume(queue=download_queue, on_message_callback=CDR_download_callback, auto_ack=False) # presumably this funtion takes us into a wait-process loop forever print('start consumer loop') From 1748d32c44f5d8f76d0a289be3f373794c7521ca Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 13 Aug 2024 13:43:27 -0500 Subject: [PATCH 13/27] downloader URI from env Downloader script now gets URI from shell environment. This MUST be set or the downloader won't work. --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index ec7ca37..52b2f72 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -45,8 +45,8 @@ # requests to as we receive requests from CDR #process_queue_list=[] -#rabbitmq_uri = "amqp://criticalmaas:keeNgoo1VahthuS4ii1r@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" -rabbitmq_uri = "amqp://ncsa:teef1Wor8iey9ohsheic@criticalmaas.ncsa.illinois.edu:5672/%2F" +# RABBITMQ_URI *must* be defined in the incoming environment otherwise all the rabbitmq features will not work. +rabbitmq_uri = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/%2F") ############### # this is only here because the include isn't working From edf9dcd8e577e99389e6308113605079b6cb18e7 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 13 Aug 2024 13:50:06 -0500 Subject: [PATCH 14/27] rm cruft in main main() runs in a "while True" loop. Cleaning up a bunch of stuff after that, which will never run but was bothering the syntax validator. --- downloader/CM_B_downloader.py | 48 ----------------------------------- 1 file changed, 48 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 52b2f72..cd22f21 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -367,55 +367,7 @@ def main(argv): print ("should never get here! Exiting!") sys.exit(2) - channel.queue_declare(queue=download_queue, durable=True) - channel.basic_consume(queue=download_queue, on_message_callback=CDR_download_callback, auto_ack=False) - # presumably this funtion takes us into a wait-process loop forever - print('start consumer loop') - channel.start_consuming() - - # my_catalog=json.load(jfile) - - - # my_request_dictionary={'input': my_input , 'output': my_output} - - # my_message = json.dumps(my_request_dictionary) - - # print("extract from catalog: map_name="+my_catalog[0]["map_name"]) - - # shouldn't get here, but just in case - sys.exit(2) - - my_outgoing_message_dictionary={'map_name': my_catalog[catalog_line]['map_name'] , 'map_id': my_catalog[catalog_line]['map_id'], 'cog_url': my_catalog[catalog_line]['cog_url']} - - - if len(my_input_file) > 0: - print("creating a single-file dictionary") - my_message_dictionary={'request_type': "input_file" , 'input_file': my_input_file , 'input_dir': my_input_dir, 'output_dir': my_output_dir, 'model': my_model_name, 'pipeline_image': my_pipeline_image, 'log_dir': my_log_dir} - else: - print("creating a directory (multi-file) dictionary") - my_message_dictionary={'request_type': "input_dir" , 'input_dir': my_input_dir , 'output_dir': my_output_dir, 'model': my_model_name, 'pipeline_image': my_pipeline_image, 'log_dir': my_log_dir} - - my_message = json.dumps(my_message_dictionary) - - parameters = pika.URLParameters(rabbitmq_uri) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=queue, durable=True) - - properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) -## message = "hello world" - channel.basic_publish(exchange='', routing_key=queue, body=my_message, properties=properties) - -# here's the thing we're feeding (from 2024 April 10th): -# apptainer run --nv -B ./logs:/logs -B /projects/bbym/saxton/MockValData/:/data -B ./output:/output /projects/bbym/shared/continerExchange/criticalmaas-pipeline_latest.sif -v --log /logs/logs.latest --data /data/theFile.tif --legends /data/theFile.json -# added: --model flat_iceberg -# -# apptainer run --nv -B ./logs:/logs -B /projects/bbym/saxton/MockValData/:/data -B ./output:/output /projects/bbym - - - - print("finished producer main()") if __name__ == '__main__': From 62202b9ccfab421fca375cf7016d05992c80416d Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 13 Aug 2024 15:40:33 -0500 Subject: [PATCH 15/27] fix exeption flow Removed a catch clause that probably wasn't doing anything and probably wans't needed. Fixed a variable error that crept in from cut and paste. Fixed grouping so that basic_ack and worker=None were at the right level in the main cleanup loop. --- downloader/CM_B_downloader.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index cd22f21..4cd247e 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -151,11 +151,11 @@ def run(self): for model_to_process in CDR_model_list: self.output_model_list.append(model_to_process) - except RequestException as e: - logging.exception(f"Request Error {response.text}.") - self.exception = e +# except RequestException as e: +# logging.exception(f"Request Error {response.text}.") +# self.exception = e except Exception as e: - logging.exception("Error processing pipeline request.") + logging.exception("Error processing download .") self.exception = e @@ -346,8 +346,8 @@ def main(argv): my_output_dictionary = worker.output_message_dictionary if worker.exception: - error_data['exception'] = repr(worker.exception) - channel.basic_publish(exchange='', routing_key=download_error_queue, body=json.dumps(error_data), properties=worker.properties) + my_output_dictionary['exception'] = repr(worker.exception) + channel.basic_publish(exchange='', routing_key=download_error_queue, body=json.dumps(my_output_dictionary), properties=worker.properties) else: # send a process message for each model in the list for model_to_process in my_model_list: @@ -360,8 +360,8 @@ def main(argv): channel.queue_declare(queue=my_process_queue, durable=True) properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=properties) - channel.basic_ack(delivery_tag=worker.method.delivery_tag) - worker = None + channel.basic_ack(delivery_tag=worker.method.delivery_tag) + worker = None print ("should never get here! Exiting!") From 9e728b8de205d92b5bfa11fa0bd9e50b60cd067f Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Tue, 13 Aug 2024 16:05:16 -0500 Subject: [PATCH 16/27] URI def cleanup Cleaned out some deprecated URI definition code that wasn't used any more. --- downloader/CM_B_downloader.py | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 4cd247e..d859d3f 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -48,25 +48,6 @@ # RABBITMQ_URI *must* be defined in the incoming environment otherwise all the rabbitmq features will not work. rabbitmq_uri = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/%2F") -############### -# this is only here because the include isn't working - -RMQ_username = "criticalmaas" -RMQ_password = "keeNgoo1VahthuS4ii1r" - -# -############## - -def set_up_RMQ(secrets_file): - global rabbitmq_uri - # global RMQ_username -# global RMQ_password - -# if os.path.exists(secrets_file): -# execfile(filename) -# rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard" - return rabbitmq_uri - # the downloader worker class class DL_worker(threading.Thread): output_model_list = [] @@ -287,6 +268,7 @@ def main(argv): my_output_dir="" my_model_name="" + global rabbitmq_uri global my_log_dir global my_pipeline_image global download_queue @@ -300,9 +282,6 @@ def main(argv): print("input file:>"+my_input_file+"<") print("output directory:>"+my_output_dir+"<") - # set up consumer - rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets") - parameters = pika.URLParameters(rabbitmq_uri) print('About to open rabbitMQ connection') connection = pika.BlockingConnection(parameters) From 8b843ae6f650d5dd2946ad1d11aaf19080f5ed51 Mon Sep 17 00:00:00 2001 From: asaxton Date: Wed, 14 Aug 2024 21:59:13 -0500 Subject: [PATCH 17/27] removed extra pika calls in process model loop --- downloader/CM_B_downloader.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index d859d3f..2b25bba 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -60,6 +60,7 @@ def process(self, method, properties, body_in): self.exception = None def run(self): + self.output_model_list.clear() try: CDR_catalog = json.loads(self.body_in) print("about to print catalog") @@ -333,12 +334,8 @@ def main(argv): print(f"sending process request for model {model_to_process}") my_process_queue = process_queue_base+model_to_process outgoing_message=json.dumps(my_output_dictionary) - parameters = pika.URLParameters(rabbitmq_uri) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() channel.queue_declare(queue=my_process_queue, durable=True) - properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent) - channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=properties) + channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=worker.properties) channel.basic_ack(delivery_tag=worker.method.delivery_tag) worker = None From 040f1a03e2ac4eabb4f6be3b321a77a6971aaa39 Mon Sep 17 00:00:00 2001 From: asaxton Date: Wed, 14 Aug 2024 22:26:03 -0500 Subject: [PATCH 18/27] adding some logging --- downloader/CM_B_downloader.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 2b25bba..bb90a80 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -310,6 +310,7 @@ def main(argv): # grab the next message from the download queue # We only get a real message when there is a message waiting and we've # closed out the last one + logging.debug("loop") method, properties, body = next(consumer) if method: # if there was a new message with real information in it, @@ -327,11 +328,13 @@ def main(argv): if worker.exception: my_output_dictionary['exception'] = repr(worker.exception) - channel.basic_publish(exchange='', routing_key=download_error_queue, body=json.dumps(my_output_dictionary), properties=worker.properties) + channel.basic_publish(exchange='', routing_key=download_error_queue, + body=json.dumps(my_output_dictionary), + properties=worker.properties) else: # send a process message for each model in the list for model_to_process in my_model_list: - print(f"sending process request for model {model_to_process}") + logging.debug(f"sending process request for model {model_to_process}") my_process_queue = process_queue_base+model_to_process outgoing_message=json.dumps(my_output_dictionary) channel.queue_declare(queue=my_process_queue, durable=True) @@ -347,5 +350,11 @@ def main(argv): if __name__ == '__main__': + logging.basicConfig(format='%(asctime)-15s [%(threadName)-15s] %(levelname)-7s :' + ' %(name)s - %(message)s', + level=logging.getLevelName(os.getenv("LOGLEVEL", "INFO").upper())) + logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN) + logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN) + logging.getLogger('pika').setLevel(logging.WARN) main(sys.argv[1:]) From 639b0ab73b8122b5704fe9e7747ff5c2402beaef Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 07:55:35 -0500 Subject: [PATCH 19/27] print -> logging --- downloader/CM_B_downloader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index bb90a80..0b521b7 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -63,9 +63,9 @@ def run(self): self.output_model_list.clear() try: CDR_catalog = json.loads(self.body_in) - print("about to print catalog") - print(CDR_catalog) - print("finished catalog") + logging.debug("about to print catalog") + logging.debug(CDR_catalog) + logging.debug("finished catalog") # I don't think I need to, or even can, do this. I don't think # I have the right kind of handle From 4d6e017a66d7ca73d78e897c8f96422cc8199cb2 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 08:38:25 -0500 Subject: [PATCH 20/27] full print cleanup All print() statements replaced with (mostly) logging.debug, or in two cases, logging.info. Tested outside of container. --- downloader/CM_B_downloader.py | 65 +++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 0b521b7..1f993e8 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -17,7 +17,10 @@ except KeyError: my_data_dir="/data" -print(f'Data dir is {my_data_dir}') +#print(f'Data dir is {my_data_dir}') + +#logging.debug("Data dir is:") +#logging.debug(my_data_dir) try: test_file_filename=os.environ['CMAAS_LOCAL_DIR_TEST_FILE'] @@ -27,8 +30,8 @@ a='2' -print("") -print("CriticaMAAS B-stage downloader") +#print("") +#print("CriticaMAAS B-stage downloader") download_main_queue="download" download_error_queue="download.error" @@ -77,7 +80,8 @@ def run(self): maparea_file_URL=CDR_catalog['map_data'] CDR_model_list=CDR_catalog['models'] my_cog_id=CDR_catalog['cog_id'] - + logging.info(f"Processing COG {my_cog_id}") + # figure out filenames and paths for everything # we split the target into intermediate directories to avoid pileup of tons @@ -103,10 +107,10 @@ def run(self): if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL - print("about to run maparea download command: "+DL_command) + logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) #time.sleep(2) - print("finished maparea download command") + logging.debug("finished maparea download command") # if the incoming message specified an image file, get it if tif_file_URL: @@ -116,12 +120,12 @@ def run(self): fetch_file_components=tif_file_URL.split("/") fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1]) if os.path.isfile(fetch_file_total_path): - print(f"File >{fetch_file_total_path}< already exists! Skipping download.") + logging.debug(f"File >{fetch_file_total_path}< already exists! Skipping download.") else: - print("about to run tif download command: "+DL_command) + logging.debug("about to run tif download command: "+DL_command) os.system(DL_command) #time.sleep(5) - print("finished tif download command") + logging.debug("finished tif download command") # construct and send processing orders based on the incoming message, and # what we downloaded. For completeness, we also include the entire incoming @@ -154,15 +158,15 @@ def CDR_download_callback(ch, method, properties, body): my_relative_filename="" - print("***Received:") + logging.debug("***Received:") # catalog from CDR CDR_catalog=json.loads(body) - print("got CDR catalog") + logging.debug("got CDR catalog") # print("map name:>>"+my_catalog['map_name']+"<<") - print("about to print catalog") - print(CDR_catalog) - print("finished catalog") + logging.debug("about to print catalog") + logging.debug(CDR_catalog) + logging.debug("finished catalog") # ack here so the request gets removed from the stack before # downloading; downloading can be minutes @@ -203,9 +207,9 @@ def CDR_download_callback(ch, method, properties, body): if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL - print("about to run maparea download command: "+DL_command) + logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) - print("finished maparea download command") + logging.debug("finished maparea download command") if tif_file_URL: DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL @@ -214,11 +218,11 @@ def CDR_download_callback(ch, method, properties, body): fetch_file_components=tif_file_URL.split("/") fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1]) if os.path.isfile(fetch_file_total_path): - print(f"File >{fetch_file_total_path}< already exists! Skipping download.") + logging.debug(f"File >{fetch_file_total_path}< already exists! Skipping download.") else: - print("about to run tif download command: "+DL_command) + logging.debug("about to run tif download command: "+DL_command) os.system(DL_command) - print("finished tif download command") + logging.debug("finished tif download command") # set up message; the message is only specific to the request file, not to # the model, so we can set up a message to be sent to all of the processing @@ -239,9 +243,9 @@ def CDR_download_callback(ch, method, properties, body): my_process_queue = process_queue_base+model_to_process outgoing_message=json.dumps(outgoing_message_dictionary) - print("About to send process message:") - print(outgoing_message) - print("finished process message") + logging.debug("About to send process message:") + logging.debug(outgoing_message) + logging.debug("finished process message") parameters = pika.URLParameters(rabbitmq_uri) connection = pika.BlockingConnection(parameters) channel = connection.channel() @@ -251,8 +255,8 @@ def CDR_download_callback(ch, method, properties, body): # exit after a single message processing for testing # sys.exit(2) - print("pausing") - time.sleep(2) +# print("pausing") +# time.sleep(2) @@ -278,15 +282,18 @@ def main(argv): global process_model_list global process_queue_base -# queue=queue_base+"_"+my_model_name + # queue=queue_base+"_"+my_model_name + + logging.debug(f'Data directory is {my_data_dir}') + - print("input file:>"+my_input_file+"<") - print("output directory:>"+my_output_dir+"<") +# print("input file:>"+my_input_file+"<") +# print("output directory:>"+my_output_dir+"<") parameters = pika.URLParameters(rabbitmq_uri) - print('About to open rabbitMQ connection') + logging.debug('About to open rabbitMQ connection') connection = pika.BlockingConnection(parameters) - print('RabbitMQ connection succeeded!') + logging.info('RabbitMQ connection succeeded!') channel = connection.channel() # from this point, following the template from the uploader From d1707f89a127498c5811d6a22515a45f21e9a17e Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 08:48:12 -0500 Subject: [PATCH 21/27] wget output -> /dev/null Flushing wget output so it's not cluttering up the command line output. --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 1f993e8..b63f7d5 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -106,7 +106,7 @@ def run(self): # if the incoming message specified a maparea file, get it if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL+" >& /dev/null" logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) #time.sleep(2) @@ -114,7 +114,7 @@ def run(self): # if the incoming message specified an image file, get it if tif_file_URL: - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL+" >& /dev/null" # check for file before downloading fetch_file_path=os.path.join(my_data_dir,external_data_path); fetch_file_components=tif_file_URL.split("/") From 123d7c144acdee50589a17cffd73658a5efb7d00 Mon Sep 17 00:00:00 2001 From: asaxton Date: Thu, 15 Aug 2024 09:37:47 -0500 Subject: [PATCH 22/27] making image and json path in rabbit message relative to first two of cog --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index bb90a80..f1817d3 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -96,8 +96,8 @@ def run(self): maparea_data_file_name=my_cog_id+".map_data.json" # here we have the actual paths for the files we're going to (potentially) download - tif_filename_with_path=os.path.join(external_data_path,tif_data_file_name) - maparea_filename_with_path=os.path.join(external_data_path,maparea_data_file_name) + tif_filename_with_path=os.path.join(relative_file_location,tif_data_file_name) + maparea_filename_with_path=os.path.join(relative_file_location,maparea_data_file_name) # if the incoming message specified a maparea file, get it if maparea_file_URL: From 5ff8149c50db58139904bcdfc73e5bde0d6e96a8 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 10:16:57 -0500 Subject: [PATCH 23/27] /dev/null -> /tmp/trash.out Apparently /dev/null doesn't play well in containers. Replacing with ordinary file name. --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 340e81d..8898c61 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -106,7 +106,7 @@ def run(self): # if the incoming message specified a maparea file, get it if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL+" >& /dev/null" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL+" >& /tmp/trash.out" logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) #time.sleep(2) @@ -114,7 +114,7 @@ def run(self): # if the incoming message specified an image file, get it if tif_file_URL: - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL+" >& /dev/null" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL+" >& /tmp/trash.out" # check for file before downloading fetch_file_path=os.path.join(my_data_dir,external_data_path); fetch_file_components=tif_file_URL.split("/") From 1ba559e3307787a1913863080b537d53c44888e0 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 10:18:51 -0500 Subject: [PATCH 24/27] base location "data" -> "" Tweaking mount points and making download agree with pipeline. --- downloader/CM_B_downloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 8898c61..657d571 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -90,7 +90,7 @@ def run(self): split_path=os.path.join(my_cog_id[0:2],my_cog_id[2:4]) relative_file_location=os.path.join(split_path,my_cog_id) # this is where the data directory is mounted inside the image - data_location_base="data" + data_location_base="" # image_base_dir_absolute="/"+data_location_relative # the total path in the image where the data is reference from From 86aba5098c7277a76f2e8c810a3ecfef5dbe4e4a Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 10:21:03 -0500 Subject: [PATCH 25/27] wget -> wget -q Cleaning up wget output in easiest way. --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 657d571..9a6bd37 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -106,7 +106,7 @@ def run(self): # if the incoming message specified a maparea file, get it if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL+" >& /tmp/trash.out" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget -q "+maparea_file_URL+" >& /tmp/trash.out" logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) #time.sleep(2) @@ -114,7 +114,7 @@ def run(self): # if the incoming message specified an image file, get it if tif_file_URL: - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL+" >& /tmp/trash.out" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget -q "+tif_file_URL+" >& /tmp/trash.out" # check for file before downloading fetch_file_path=os.path.join(my_data_dir,external_data_path); fetch_file_components=tif_file_URL.split("/") From b43dc40c00e2fd13ba2e67c31eb0214fb7ec358d Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Thu, 15 Aug 2024 10:34:11 -0500 Subject: [PATCH 26/27] remove redirects Apparently redirecting output, even to a benign file, causes heartburn for apptainer. Removing redirects. --- downloader/CM_B_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/downloader/CM_B_downloader.py b/downloader/CM_B_downloader.py index 9a6bd37..69d7da6 100755 --- a/downloader/CM_B_downloader.py +++ b/downloader/CM_B_downloader.py @@ -106,7 +106,7 @@ def run(self): # if the incoming message specified a maparea file, get it if maparea_file_URL: # rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one. - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget -q "+maparea_file_URL+" >& /tmp/trash.out" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget -q "+maparea_file_URL logging.debug("about to run maparea download command: "+DL_command) os.system(DL_command) #time.sleep(2) @@ -114,7 +114,7 @@ def run(self): # if the incoming message specified an image file, get it if tif_file_URL: - DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget -q "+tif_file_URL+" >& /tmp/trash.out" + DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget -q "+tif_file_URL # check for file before downloading fetch_file_path=os.path.join(my_data_dir,external_data_path); fetch_file_components=tif_file_URL.split("/") From 0aa400fafbb4007fc80d30efca6194722328f203 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Fri, 16 Aug 2024 15:41:26 -0400 Subject: [PATCH 27/27] Aug 16 release About to merge new release with downloader. --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80be153..9172de2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [unreleased] - 2024-08-16 + +### Added +- Added downloader + ## [0.8.0] - 2024-08-06 ### Added