Skip to content

Commit

Permalink
Merge branch 'downloader_initial' of github.com:DARPA-CRITICALMAAS/ui…
Browse files Browse the repository at this point in the history
…uc-cdr into downloader_initial
  • Loading branch information
asaxton committed Aug 15, 2024
2 parents 123d7c1 + d1707f8 commit feaf6fc
Showing 1 changed file with 41 additions and 34 deletions.
75 changes: 41 additions & 34 deletions downloader/CM_B_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
except KeyError:
my_data_dir="/data"

print(f'Data dir is {my_data_dir}')
#print(f'Data dir is {my_data_dir}')

#logging.debug("Data dir is:")
#logging.debug(my_data_dir)

try:
test_file_filename=os.environ['CMAAS_LOCAL_DIR_TEST_FILE']
Expand All @@ -27,8 +30,8 @@
a='2'


print("")
print("CriticaMAAS B-stage downloader")
#print("")
#print("CriticaMAAS B-stage downloader")

download_main_queue="download"
download_error_queue="download.error"
Expand Down Expand Up @@ -63,9 +66,9 @@ def run(self):
self.output_model_list.clear()
try:
CDR_catalog = json.loads(self.body_in)
print("about to print catalog")
print(CDR_catalog)
print("finished catalog")
logging.debug("about to print catalog")
logging.debug(CDR_catalog)
logging.debug("finished catalog")

# I don't think I need to, or even can, do this. I don't think
# I have the right kind of handle
Expand All @@ -77,7 +80,8 @@ def run(self):
maparea_file_URL=CDR_catalog['map_data']
CDR_model_list=CDR_catalog['models']
my_cog_id=CDR_catalog['cog_id']

logging.info(f"Processing COG {my_cog_id}")

# figure out filenames and paths for everything

# we split the target into intermediate directories to avoid pileup of tons
Expand All @@ -102,26 +106,26 @@ def run(self):
# if the incoming message specified a maparea file, get it
if maparea_file_URL:
# rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one.
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL
print("about to run maparea download command: "+DL_command)
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL+" >& /dev/null"
logging.debug("about to run maparea download command: "+DL_command)
os.system(DL_command)
#time.sleep(2)
print("finished maparea download command")
logging.debug("finished maparea download command")

# if the incoming message specified an image file, get it
if tif_file_URL:
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL+" >& /dev/null"
# check for file before downloading
fetch_file_path=os.path.join(my_data_dir,external_data_path);
fetch_file_components=tif_file_URL.split("/")
fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1])
if os.path.isfile(fetch_file_total_path):
print(f"File >{fetch_file_total_path}< already exists! Skipping download.")
logging.debug(f"File >{fetch_file_total_path}< already exists! Skipping download.")
else:
print("about to run tif download command: "+DL_command)
logging.debug("about to run tif download command: "+DL_command)
os.system(DL_command)
#time.sleep(5)
print("finished tif download command")
logging.debug("finished tif download command")

# construct and send processing orders based on the incoming message, and
# what we downloaded. For completeness, we also include the entire incoming
Expand Down Expand Up @@ -154,15 +158,15 @@ def CDR_download_callback(ch, method, properties, body):

my_relative_filename=""

print("***Received:")
logging.debug("***Received:")
# catalog from CDR
CDR_catalog=json.loads(body)
print("got CDR catalog")
logging.debug("got CDR catalog")
# print("map name:>>"+my_catalog['map_name']+"<<")

print("about to print catalog")
print(CDR_catalog)
print("finished catalog")
logging.debug("about to print catalog")
logging.debug(CDR_catalog)
logging.debug("finished catalog")

# ack here so the request gets removed from the stack before
# downloading; downloading can be minutes
Expand Down Expand Up @@ -203,9 +207,9 @@ def CDR_download_callback(ch, method, properties, body):
if maparea_file_URL:
# rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one.
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL
print("about to run maparea download command: "+DL_command)
logging.debug("about to run maparea download command: "+DL_command)
os.system(DL_command)
print("finished maparea download command")
logging.debug("finished maparea download command")

if tif_file_URL:
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL
Expand All @@ -214,11 +218,11 @@ def CDR_download_callback(ch, method, properties, body):
fetch_file_components=tif_file_URL.split("/")
fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1])
if os.path.isfile(fetch_file_total_path):
print(f"File >{fetch_file_total_path}< already exists! Skipping download.")
logging.debug(f"File >{fetch_file_total_path}< already exists! Skipping download.")
else:
print("about to run tif download command: "+DL_command)
logging.debug("about to run tif download command: "+DL_command)
os.system(DL_command)
print("finished tif download command")
logging.debug("finished tif download command")

# set up message; the message is only specific to the request file, not to
# the model, so we can set up a message to be sent to all of the processing
Expand All @@ -239,9 +243,9 @@ def CDR_download_callback(ch, method, properties, body):
my_process_queue = process_queue_base+model_to_process

outgoing_message=json.dumps(outgoing_message_dictionary)
print("About to send process message:")
print(outgoing_message)
print("finished process message")
logging.debug("About to send process message:")
logging.debug(outgoing_message)
logging.debug("finished process message")
parameters = pika.URLParameters(rabbitmq_uri)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
Expand All @@ -251,8 +255,8 @@ def CDR_download_callback(ch, method, properties, body):

# exit after a single message processing for testing
# sys.exit(2)
print("pausing")
time.sleep(2)
# print("pausing")
# time.sleep(2)



Expand All @@ -278,15 +282,18 @@ def main(argv):
global process_model_list
global process_queue_base

# queue=queue_base+"_"+my_model_name
# queue=queue_base+"_"+my_model_name

logging.debug(f'Data directory is {my_data_dir}')


print("input file:>"+my_input_file+"<")
print("output directory:>"+my_output_dir+"<")
# print("input file:>"+my_input_file+"<")
# print("output directory:>"+my_output_dir+"<")

parameters = pika.URLParameters(rabbitmq_uri)
print('About to open rabbitMQ connection')
logging.debug('About to open rabbitMQ connection')
connection = pika.BlockingConnection(parameters)
print('RabbitMQ connection succeeded!')
logging.info('RabbitMQ connection succeeded!')
channel = connection.channel()

# from this point, following the template from the uploader
Expand Down

0 comments on commit feaf6fc

Please sign in to comment.