Skip to content

Commit

Permalink
logging an error message for invalid files while loading (georgia-tec…
Browse files Browse the repository at this point in the history
…h-db#1334)

Issue - [721](georgia-tech-db#721)

Currently, we abort the entire process when the load executor encounters
a corrupted file.
  • Loading branch information
rohithmulumudy authored Nov 6, 2023
1 parent 35b772b commit 64219f1
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 41 deletions.
2 changes: 1 addition & 1 deletion evadb/executor/load_multimedia_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def exec(self, *args, **kwargs):

invalid_files_str = "\n".join(invalid_files)
err_msg = f"no valid file found at -- '{invalid_files_str}'."
raise ValueError(err_msg)
logger.error(err_msg)

# Get valid files.
valid_files = [
Expand Down
153 changes: 113 additions & 40 deletions test/integration_tests/long/test_load_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,16 @@ def test_should_fail_to_load_invalid_files_as_video(self):
self.evadb, "SELECT name FROM MyVideos", do_not_print_exceptions=True
)

def test_should_rollback_if_video_load_fails(self):
def test_should_rollback_or_skip_if_video_load_fails(self):
path_regex = Path(f"{EvaDB_ROOT_DIR}/data/sample_videos/1/*.mp4")
valid_videos = glob.glob(str(path_regex.expanduser()), recursive=True)

tempfile_name = os.urandom(24).hex()
tempfile_path = os.path.join(tempfile.gettempdir(), tempfile_name)
with open(tempfile_path, "wb") as empty_file:
# Load one correct file and one empty file
# Load one empty file
# nothing should be added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_videos[0]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD VIDEO "{path}" INTO MyVideos;"""
Expand All @@ -233,24 +232,32 @@ def test_should_rollback_if_video_load_fails(self):
do_not_print_exceptions=True,
)

# Load one correct file and one empty file
# one file should get added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_videos[0]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD VIDEO "{path}" INTO MyVideos;"""
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.VIDEO.name}: 1"])
)
self.assertEqual(result, expected)

# Load two correct file and one empty file
# nothing should be added
# two files should get added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_videos[0]), tmp_dir)
shutil.copy2(str(valid_videos[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD VIDEO "{path}" INTO MyVideos;"""
with self.assertRaises(Exception):
execute_query_fetch_all(
self.evadb, query, do_not_print_exceptions=True
)
with self.assertRaises(BinderError):
execute_query_fetch_all(
self.evadb,
"SELECT name FROM MyVideos",
do_not_print_exceptions=True,
)
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.VIDEO.name}: 2"])
)
self.assertEqual(result, expected)

def test_should_rollback_and_preserve_previous_state(self):
path_regex = Path(f"{EvaDB_ROOT_DIR}/data/sample_videos/1/*.mp4")
Expand All @@ -262,13 +269,12 @@ def test_should_rollback_and_preserve_previous_state(self):
self.evadb, f"""LOAD VIDEO "{load_file}" INTO MyVideos;"""
)

# Load one correct file and one empty file
# original file should remain
tempfile_name = os.urandom(24).hex()
tempfile_path = os.path.join(tempfile.gettempdir(), tempfile_name)
with open(tempfile_path, "wb") as empty_file:
# Load one empty file
# original file should remain
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_videos[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD VIDEO "{path}" INTO MyVideos;"""
Expand All @@ -282,6 +288,25 @@ def test_should_rollback_and_preserve_previous_state(self):
file_names = np.unique(result.frames)
self.assertEqual(len(file_names), 1)

# Load one correct file and one empty file
# original file should remain and the correct file should get added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_videos[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD VIDEO "{path}" INTO MyVideos;"""
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.VIDEO.name}: 1"])
)
self.assertEqual(result, expected)

result = execute_query_fetch_all(
self.evadb, "SELECT name FROM MyVideos"
)
file_names = np.unique(result.frames)
self.assertEqual(len(file_names), 2)

###########################################
# integration testcases for load image

Expand Down Expand Up @@ -342,18 +367,17 @@ def test_should_fail_to_load_invalid_files_as_image(self):
self.evadb, "SELECT name FROM MyImages;", do_not_print_exceptions=True
)

def test_should_rollback_if_image_load_fails(self):
def test_should_rollback_or_pass_if_image_load_fails(self):
valid_images = glob.glob(
str(self.image_files_path.expanduser()), recursive=True
)

tempfile_name = os.urandom(24).hex()
tempfile_path = os.path.join(tempfile.gettempdir(), tempfile_name)
with open(tempfile_path, "wb") as empty_file:
# Load one correct file and one empty file
# Load one empty file
# nothing should be added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_images[0]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD IMAGE "{path}" INTO MyImages;"""
Expand All @@ -368,26 +392,34 @@ def test_should_rollback_if_image_load_fails(self):
do_not_print_exceptions=True,
)

# Load one correct file and one empty file
# correct file should be added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_images[0]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD IMAGE "{path}" INTO MyImages;"""
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.IMAGE.name}: 1"])
)
self.assertEqual(result, expected)

# Load two correct file and one empty file
# nothing should be added
# two correct files should be added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_images[0]), tmp_dir)
shutil.copy2(str(valid_images[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD IMAGE "{path}" INTO MyImages;"""
with self.assertRaises(Exception):
execute_query_fetch_all(
self.evadb, query, do_not_print_exceptions=True
)
with self.assertRaises(BinderError):
execute_query_fetch_all(
self.evadb,
"SELECT name FROM MyImages;",
do_not_print_exceptions=True,
)
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.IMAGE.name}: 2"])
)
self.assertEqual(result, expected)

def test_should_rollback_and_preserve_previous_state_for_load_images(self):
def test_should_rollback_or_pass_and_preserve_previous_state_for_load_images(self):
valid_images = glob.glob(
str(self.image_files_path.expanduser()), recursive=True
)
Expand All @@ -397,13 +429,12 @@ def test_should_rollback_and_preserve_previous_state_for_load_images(self):
self.evadb, f"""LOAD IMAGE "{valid_images[0]}" INTO MyImages;"""
)

# Load one correct file and one empty file
# original file should remain
tempfile_name = os.urandom(24).hex()
tempfile_path = os.path.join(tempfile.gettempdir(), tempfile_name)
with open(tempfile_path, "wb") as empty_file:
# Load one empty file
# original file should remain
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_images[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD IMAGE "{path}" INTO MyImages;"""
Expand All @@ -418,6 +449,37 @@ def test_should_rollback_and_preserve_previous_state_for_load_images(self):
expected = Batch(pd.DataFrame([{"myimages.name": valid_images[0]}]))
self.assertEqual(expected, result)

# Load one empty and one correct file
# original file should remaina and correct file should get added
with tempfile.TemporaryDirectory() as tmp_dir:
shutil.copy2(str(valid_images[1]), tmp_dir)
shutil.copy2(str(empty_file.name), tmp_dir)
path = Path(tmp_dir) / "*"
query = f"""LOAD IMAGE "{path}" INTO MyImages;"""
result = execute_query_fetch_all(self.evadb, query)
expected = Batch(
pd.DataFrame([f"Number of loaded {FileFormatType.IMAGE.name}: 1"])
)
self.assertEqual(result, expected)

result = execute_query_fetch_all(
self.evadb, "SELECT name FROM MyImages"
)
self.assertEqual(len(result), 2)
expected = Batch(
pd.DataFrame(
[
{"myimages.name": valid_images[0]},
{
"myimages.name": os.path.join(
tmp_dir, os.path.basename(valid_images[1])
)
},
]
)
)
self.assertEqual(expected, result)

###################################
# integration tests for csv
def test_should_load_csv_with_columns_in_table(self):
Expand Down Expand Up @@ -471,7 +533,7 @@ def test_should_use_parallel_load(self):
# Clean up large scale image directory.
shutil.rmtree(large_scale_image_files_path)

def test_parallel_load_should_raise_exception(self):
def test_parallel_load_should_raise_exception_or_pass(self):
# Create images.
large_scale_image_files_path = create_large_scale_image_dataset(
mp.cpu_count() * 10
Expand All @@ -481,11 +543,22 @@ def test_parallel_load_should_raise_exception(self):
with open(os.path.join(large_scale_image_files_path, "img0.jpg"), "w") as f:
f.write("aa")

with self.assertRaises(ExecutorError):
load_query = f"LOAD IMAGE '{large_scale_image_files_path}/**/*.jpg' INTO MyLargeScaleImages;"
execute_query_fetch_all(
self.evadb, load_query, do_not_print_exceptions=True
load_query = f"LOAD IMAGE '{large_scale_image_files_path}/**/*.jpg' INTO MyLargeScaleImages;"
result = execute_query_fetch_all(self.evadb, load_query)

file_count = len(
[
entry
for entry in os.listdir(large_scale_image_files_path)
if os.path.isfile(os.path.join(large_scale_image_files_path, entry))
]
)
expected = Batch(
pd.DataFrame(
[f"Number of loaded {FileFormatType.IMAGE.name}: {file_count-1}"]
)
)
self.assertEqual(result, expected)

drop_query = "DROP TABLE IF EXISTS MyLargeScaleImages;"
execute_query_fetch_all(self.evadb, drop_query)
Expand Down

0 comments on commit 64219f1

Please sign in to comment.