Skip to content

Commit

Permalink
downloader working local; tested
Browse files Browse the repository at this point in the history
Downloader working and tested in local mode.  Seems to interact
correctly with rabbitmq.
  • Loading branch information
craigsteffen committed Aug 13, 2024
1 parent 3c2b7f5 commit fd35028
Showing 1 changed file with 158 additions and 11 deletions.
169 changes: 158 additions & 11 deletions downloader/CM_B_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import json
import time
#import argparse
import threading
from requests.exceptions import RequestException
import logging

# configuration options; should come from environment variables or something
# this is the base of the working directory in the main (non-image) parallel
Expand All @@ -27,7 +30,8 @@
print("")
print("CriticaMAAS B-stage downloader")

download_queue="download"
download_main_queue="download"
download_error_queue="download.error"

# name preamble for processing queues, to make it easier for humans to see the
# function of the processing queues
Expand Down Expand Up @@ -63,7 +67,99 @@ def set_up_RMQ(secrets_file):
# rabbitmq_uri = f"amqp://{RMQ_username}:{RMQ_password}@rabbitmq.criticalmaas.software-dev.ncsa.illinois.edu:5672/shepard"
return rabbitmq_uri


# the downloader worker class
class DL_worker(threading.Thread):
output_model_list = []
output_message_dictionary = {}

def process(self, method, properties, body_in):
self.method = method
self.properties = properties
self.body_in = body_in
self.exception = None

def run(self):
try:
CDR_catalog = json.loads(self.body_in)
print("about to print catalog")
print(CDR_catalog)
print("finished catalog")

# I don't think I need to, or even can, do this. I don't think
# I have the right kind of handle
# ch.basic_ack(delivery_tag=method.delivery_tag)


# grabbing relevant portions of the incoming message.
tif_file_URL=CDR_catalog['cog_url']
maparea_file_URL=CDR_catalog['map_data']
CDR_model_list=CDR_catalog['models']
my_cog_id=CDR_catalog['cog_id']

# figure out filenames and paths for everything

# we split the target into intermediate directories to avoid pileup of tons
# of entities in a single directory; this is for scaling to a parallel
# filesystem with many man requests.
split_path=os.path.join(my_cog_id[0:2],my_cog_id[2:4])
relative_file_location=os.path.join(split_path,my_cog_id)
# this is where the data directory is mounted inside the image
data_location_base="data"
# image_base_dir_absolute="/"+data_location_relative

# the total path in the image where the data is reference from
external_data_path=os.path.join(data_location_base,relative_file_location)
tif_data_file_name=my_cog_id+".cog.tif"
# maparea_data_file_name=my_cog_id+".cog_area.json"
maparea_data_file_name=my_cog_id+".map_data.json"

# here we have the actual paths for the files we're going to (potentially) download
tif_filename_with_path=os.path.join(external_data_path,tif_data_file_name)
maparea_filename_with_path=os.path.join(external_data_path,maparea_data_file_name)

# if the incoming message specified a maparea file, get it
if maparea_file_URL:
# rm -f is because there's no clean way to tell wget to overwrite files; so this guarantees that the file is the newest downloaded one.
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; rm -f "+maparea_data_file_name+" ; wget "+maparea_file_URL
print("about to run maparea download command: "+DL_command)
os.system(DL_command)
#time.sleep(2)
print("finished maparea download command")

# if the incoming message specified an image file, get it
if tif_file_URL:
DL_command="cd "+my_data_dir+" ; mkdir -p "+external_data_path+" ; cd "+external_data_path+" ; wget "+tif_file_URL
# check for file before downloading
fetch_file_path=os.path.join(my_data_dir,external_data_path);
fetch_file_components=tif_file_URL.split("/")
fetch_file_total_path=os.path.join(fetch_file_path,fetch_file_components[-1])
if os.path.isfile(fetch_file_total_path):
print(f"File >{fetch_file_total_path}< already exists! Skipping download.")
else:
print("about to run tif download command: "+DL_command)
os.system(DL_command)
#time.sleep(5)
print("finished tif download command")

# construct and send processing orders based on the incoming message, and
# what we downloaded. For completeness, we also include the entire incoming
# catalog for easier tracking
outgoing_message_dictionary=CDR_catalog;
outgoing_message_dictionary["image_filename"]=tif_filename_with_path
outgoing_message_dictionary["json_filename"]=maparea_filename_with_path
self.output_message_dictionary=outgoing_message_dictionary
for model_to_process in CDR_model_list:
self.output_model_list.append(model_to_process)

except RequestException as e:
logging.exception(f"Request Error {response.text}.")
self.exception = e
except Exception as e:
logging.exception("Error processing pipeline request.")
self.exception = e



def CDR_download_callback(ch, method, properties, body):
# global my_log_dir
# global my_input_dir
Expand Down Expand Up @@ -203,15 +299,7 @@ def main(argv):

print("input file:>"+my_input_file+"<")
print("output directory:>"+my_output_dir+"<")
#print("using queue:>"+queue+"<")

# for each model_name in process_model_list:
# process_queue_list.append(process_queue_base+model_name)

# print("total processing queue list:")
# print(process_queue_list)
# print("finished printing processing queue list.")

# set up consumer
rabbitmq_uri=set_up_RMQ("~/.criticalmaas/secrets")

Expand All @@ -220,8 +308,67 @@ def main(argv):
connection = pika.BlockingConnection(parameters)
print('RabbitMQ connection succeeded!')
channel = connection.channel()
channel.queue_declare(queue=download_queue, durable=True)

# from this point, following the template from the uploader
channel.queue_declare(queue=download_main_queue, durable=True)
channel.queue_declare(queue=download_error_queue, durable=True)
# We only know for sure our incoming download queue and its error queue.
# Queues for models are defined according to incoming messages, so we
# don't pre-declare them here.

# Defining prefetch count here to be 1 means that if we're processing something but
# we haven't closed it out, and we asked for another, we get an empty response.
channel.basic_qos(prefetch_count=1)

# here's where the flow bifurcase from my oringinal non-threaded downloader

# consumer iterator to pull messages off the download queue
consumer = channel.consume(queue=download_main_queue, inactivity_timeout=1)

worker = None
while True:
# grab the next message from the download queue
# We only get a real message when there is a message waiting and we've
# closed out the last one
method, properties, body = next(consumer)
if method:
# if there was a new message with real information in it,
# we fire off a worker with its contents
worker = DL_worker()
worker.process(method, properties, body)
worker.start()

if worker:
if not worker.is_alive():
# if the worker has finished its task, we grab its contents, send out
# its messages, and then close it out to the queue iterator
my_model_list = worker.output_model_list
my_output_dictionary = worker.output_message_dictionary

if worker.exception:
error_data['exception'] = repr(worker.exception)
channel.basic_publish(exchange='', routing_key=download_error_queue, body=json.dumps(error_data), properties=worker.properties)
else:
# send a process message for each model in the list
for model_to_process in my_model_list:
print(f"sending process request for model {model_to_process}")
my_process_queue = process_queue_base+model_to_process
outgoing_message=json.dumps(my_output_dictionary)
parameters = pika.URLParameters(rabbitmq_uri)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=my_process_queue, durable=True)
properties = pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent)
channel.basic_publish(exchange='', routing_key=my_process_queue, body=outgoing_message, properties=properties)
channel.basic_ack(delivery_tag=worker.method.delivery_tag)
worker = None


print ("should never get here! Exiting!")
sys.exit(2)

channel.queue_declare(queue=download_queue, durable=True)

channel.basic_consume(queue=download_queue, on_message_callback=CDR_download_callback, auto_ack=False)
# presumably this funtion takes us into a wait-process loop forever
print('start consumer loop')
Expand Down

0 comments on commit fd35028

Please sign in to comment.