From e8ee20f6aec9f745f736a072af3da2ce8314f50e Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Mon, 13 May 2024 15:45:44 -0400 Subject: [PATCH 1/5] if-->wait race condition Putting in a CONDITIONAL wait to combat a race condition in the distributed file system. It first checks to see if the file exists. If it doesn't, it waits .1 seconds, if it still doesn't, it waits 1.0 seconds. Then it raises high holy hell about the file not existing. So if it does indeed exist, we've only incurred the cost of running a single existence check and an if, so it will only do the wait if it already HAS hit the race condition. --- uploader/uploader.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/uploader/uploader.py b/uploader/uploader.py index b0ccdd4..0e60ce1 100644 --- a/uploader/uploader.py +++ b/uploader/uploader.py @@ -5,6 +5,7 @@ import requests from requests.exceptions import RequestException import threading +import time # rabbitmq uri rabbitmq_uri = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/%2F") @@ -30,6 +31,13 @@ def run(self): data = json.loads(self.body) file = os.path.join("/output", data['cdr_output']) logging.debug(f"Uploading data for {data['cog_id']} from {file}") + if not os.path.exists(file): + time.sleep(.1) + if not os.path.exists(file): + time.sleep(1) + if not os.path.exists(file): + print(f"ERROR! File {file} does not exist for uploader even after a wait!") + raise ValueError(f"File {file} does not exist for uploader!!!") # only upload if less than certain size if os.path.getsize(file) > max_size * 1024 * 1024: # size in bytes raise ValueError(f"File {file} is larger than {max_size}MB, skipping upload.") From f8ed237785647bdf01bd69ff63a86892ee9672e4 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Mon, 13 May 2024 16:05:01 -0400 Subject: [PATCH 2/5] print -> logging.exception doing error reporting correctly via logging.exception (instead of print() ) --- uploader/uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uploader/uploader.py b/uploader/uploader.py index 0e60ce1..41546e5 100644 --- a/uploader/uploader.py +++ b/uploader/uploader.py @@ -36,7 +36,7 @@ def run(self): if not os.path.exists(file): time.sleep(1) if not os.path.exists(file): - print(f"ERROR! File {file} does not exist for uploader even after a wait!") + logging.exception(f"ERROR! File {file} does not exist for uploader even after a wait!") raise ValueError(f"File {file} does not exist for uploader!!!") # only upload if less than certain size if os.path.getsize(file) > max_size * 1024 * 1024: # size in bytes From 46ec7c753818f212e269a698006cdfa7f267c889 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Mon, 13 May 2024 16:40:33 -0400 Subject: [PATCH 3/5] update for 0.7.3 Proper log message for 0.7.3 --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 681551c..8ab77c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [0.7.3] - 2024-05-13 + +### Added +- Added conditional wait to combat potential parallel file system race condition when pipeline components are all running synchronously. Only waits when file that should exist doesn't exist. + ## [0.7.2] - 2024-05-03 ### Fixed From f5883624f3b6a2b772ee2349e8c4ba5e25daaa7f Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Wed, 26 Jun 2024 16:06:55 -0400 Subject: [PATCH 4/5] if if -> while not w/ counter Replacing tree if ifs (for os.path.exists for the target) with a while(not exists) loop with a counter. Makes it more flexible and more compact. Co-authored-by: Rob Kooper --- uploader/uploader.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/uploader/uploader.py b/uploader/uploader.py index 41546e5..16d980b 100644 --- a/uploader/uploader.py +++ b/uploader/uploader.py @@ -31,7 +31,12 @@ def run(self): data = json.loads(self.body) file = os.path.join("/output", data['cdr_output']) logging.debug(f"Uploading data for {data['cog_id']} from {file}") - if not os.path.exists(file): + counter = 0 + while not os.path.exists(file): + counter = counter + 1 + if counter > 2: # maybe make a variable above + raise ValueError(f"File {file} does not exist for uploader!!!") + time.sleep(1) time.sleep(.1) if not os.path.exists(file): time.sleep(1) From d81fd8e446e4b62ef6e43e26ca85e2e5217f4050 Mon Sep 17 00:00:00 2001 From: Craig P Steffen Date: Wed, 26 Jun 2024 15:32:56 -0500 Subject: [PATCH 5/5] clean up file check if tree I don't think the github patch applier got the final version of the code right. I think this is what Rob intended with his revised while loop replacing the couple of ifs I had initially written. --- uploader/uploader.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/uploader/uploader.py b/uploader/uploader.py index 16d980b..ff9c75b 100644 --- a/uploader/uploader.py +++ b/uploader/uploader.py @@ -20,6 +20,9 @@ class Worker(threading.Thread): + file_check_time_interval=.1 + file_check_max_checks=5 + def process(self, method, properties, body): self.method = method self.properties = properties @@ -31,18 +34,13 @@ def run(self): data = json.loads(self.body) file = os.path.join("/output", data['cdr_output']) logging.debug(f"Uploading data for {data['cog_id']} from {file}") + # counts number of times file doesn't exist before aborting counter = 0 while not os.path.exists(file): counter = counter + 1 - if counter > 2: # maybe make a variable above + if counter > file_check_max_checks: raise ValueError(f"File {file} does not exist for uploader!!!") - time.sleep(1) - time.sleep(.1) - if not os.path.exists(file): - time.sleep(1) - if not os.path.exists(file): - logging.exception(f"ERROR! File {file} does not exist for uploader even after a wait!") - raise ValueError(f"File {file} does not exist for uploader!!!") + time.sleep(file_check_time_interval) # only upload if less than certain size if os.path.getsize(file) > max_size * 1024 * 1024: # size in bytes raise ValueError(f"File {file} is larger than {max_size}MB, skipping upload.")