From 51016709a12c56e50f78ffff95e74ee8081f94c0 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 8 Jan 2024 17:09:04 +0800 Subject: [PATCH] Fix S3 and Azure test suites (#493) [ML-5397](https://jira.iguazeng.com/browse/ML-5397) --- dev-requirements.txt | 4 +++- integration/test_azure_filesystem_integration.py | 15 +++++++++------ integration/test_s3_filesystem_integration.py | 10 +++++----- requirements.txt | 1 + 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 24183a9e..33cf8582 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,4 +9,6 @@ lupa~=1.13 fakeredis~=1.9 redis~=4.3 # in sqlalchemy>=2.0 there is breaking changes (such as in Table class autoload argument is removed) -sqlalchemy~=1.4 \ No newline at end of file +sqlalchemy~=1.4 +s3fs~=2023.9.2 +adlfs~=2023.9.0 diff --git a/integration/test_azure_filesystem_integration.py b/integration/test_azure_filesystem_integration.py index 6d04c8d8..f382a149 100644 --- a/integration/test_azure_filesystem_integration.py +++ b/integration/test_azure_filesystem_integration.py @@ -266,12 +266,13 @@ def test_write_to_parquet_to_azure(azure_setup_teardown_test): for i in range(10): controller.emit([i, f"this is {i}"]) expected.append([i, f"this is {i}"]) - expected = pd.DataFrame(expected, columns=columns, dtype="int32") + expected = pd.DataFrame(expected, columns=columns) controller.terminate() controller.await_termination() read_back_df = pd.read_parquet(out_dir, columns=columns, storage_options=storage_options) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + read_back_df["my_int"] = read_back_df["my_int"].astype("int64") + pd.testing.assert_frame_equal(read_back_df, expected) @pytest.mark.skipif(not has_azure_credentials, reason="No azure credentials found") @@ -291,12 +292,13 @@ def test_write_to_parquet_to_azure_single_file_on_termination( for i in range(10): controller.emit([i, f"this is {i}"]) expected.append([i, f"this is {i}"]) - expected = pd.DataFrame(expected, columns=columns, dtype="int64") + expected = pd.DataFrame(expected, columns=columns) controller.terminate() controller.await_termination() read_back_df = pd.read_parquet(out_file, columns=columns, storage_options=storage_options) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + read_back_df["my_int"] = read_back_df["my_int"].astype("int64") + pd.testing.assert_frame_equal(read_back_df, expected) @pytest.mark.skipif(not has_azure_credentials, reason="No azure credentials found") @@ -319,10 +321,11 @@ def test_write_to_parquet_to_azure_with_indices(azure_setup_teardown_test): controller.emit([i, f"this is {i}"], key=f"key{i}") expected.append([f"key{i}", i, f"this is {i}"]) columns = ["event_key", "my_int", "my_string"] - expected = pd.DataFrame(expected, columns=columns, dtype="int64") + expected = pd.DataFrame(expected, columns=columns) expected.set_index(["event_key"], inplace=True) controller.terminate() controller.await_termination() read_back_df = pd.read_parquet(out_file, columns=columns, storage_options=storage_options) - assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}" + read_back_df["my_int"] = read_back_df["my_int"].astype("int64") + pd.testing.assert_frame_equal(read_back_df, expected) diff --git a/integration/test_s3_filesystem_integration.py b/integration/test_s3_filesystem_integration.py index 82c73e49..e3478f13 100644 --- a/integration/test_s3_filesystem_integration.py +++ b/integration/test_s3_filesystem_integration.py @@ -230,7 +230,7 @@ def test_write_csv_from_lists_with_metadata_and_column_pruning_to_s3(s3_teardown @pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found") def test_write_to_parquet_to_s3(s3_setup_teardown_test): - out_dir = f"s3://{s3_setup_teardown_test}/" + out_dir = f"s3://{s3_setup_teardown_test}" columns = ["my_int", "my_string"] controller = build_flow( [ @@ -254,7 +254,7 @@ def test_write_to_parquet_to_s3(s3_setup_teardown_test): @pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found") def test_write_to_parquet_to_s3_single_file_on_termination(s3_setup_teardown_test): - out_file = f"s3://{s3_setup_teardown_test}/myfile.pq" + out_file = f"s3://{s3_setup_teardown_test}myfile.pq" columns = ["my_int", "my_string"] controller = build_flow([SyncEmitSource(), ParquetTarget(out_file, columns=columns)]).run() @@ -262,7 +262,7 @@ def test_write_to_parquet_to_s3_single_file_on_termination(s3_setup_teardown_tes for i in range(10): controller.emit([i, f"this is {i}"]) expected.append([i, f"this is {i}"]) - expected = pd.DataFrame(expected, columns=columns, dtype="int64") + expected = pd.DataFrame(expected, columns=columns) controller.terminate() controller.await_termination() @@ -272,7 +272,7 @@ def test_write_to_parquet_to_s3_single_file_on_termination(s3_setup_teardown_tes @pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found") def test_write_to_parquet_to_s3_with_indices(s3_setup_teardown_test): - out_file = f"s3://{s3_setup_teardown_test}/test_write_to_parquet_with_indices{uuid.uuid4().hex}/" + out_file = f"s3://{s3_setup_teardown_test}test_write_to_parquet_with_indices{uuid.uuid4().hex}/" controller = build_flow( [ SyncEmitSource(), @@ -285,7 +285,7 @@ def test_write_to_parquet_to_s3_with_indices(s3_setup_teardown_test): controller.emit([i, f"this is {i}"], key=f"key{i}") expected.append([f"key{i}", i, f"this is {i}"]) columns = ["event_key", "my_int", "my_string"] - expected = pd.DataFrame(expected, columns=columns, dtype="int64") + expected = pd.DataFrame(expected, columns=columns) expected.set_index(["event_key"], inplace=True) controller.terminate() controller.await_termination() diff --git a/requirements.txt b/requirements.txt index 9bc9fa73..111ff3b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ numpy>=1.16.5,<1.27 # <15 is just a safeguard - no tests performed with pyarrow higher than 14 pyarrow>=1,<15 v3io-frames~=0.10.9 +fsspec>=0.6.2 v3iofs~=0.1.17 xxhash>=1 nuclio-sdk>=0.5.3