Skip to content

Commit

Permalink
Fix S3 and Azure test suites (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper authored Jan 8, 2024
1 parent 12ceccf commit 5101670
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
4 changes: 3 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ lupa~=1.13
fakeredis~=1.9
redis~=4.3
# in sqlalchemy>=2.0 there is breaking changes (such as in Table class autoload argument is removed)
sqlalchemy~=1.4
sqlalchemy~=1.4
s3fs~=2023.9.2
adlfs~=2023.9.0
15 changes: 9 additions & 6 deletions integration/test_azure_filesystem_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,13 @@ def test_write_to_parquet_to_azure(azure_setup_teardown_test):
for i in range(10):
controller.emit([i, f"this is {i}"])
expected.append([i, f"this is {i}"])
expected = pd.DataFrame(expected, columns=columns, dtype="int32")
expected = pd.DataFrame(expected, columns=columns)
controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_dir, columns=columns, storage_options=storage_options)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"
read_back_df["my_int"] = read_back_df["my_int"].astype("int64")
pd.testing.assert_frame_equal(read_back_df, expected)


@pytest.mark.skipif(not has_azure_credentials, reason="No azure credentials found")
Expand All @@ -291,12 +292,13 @@ def test_write_to_parquet_to_azure_single_file_on_termination(
for i in range(10):
controller.emit([i, f"this is {i}"])
expected.append([i, f"this is {i}"])
expected = pd.DataFrame(expected, columns=columns, dtype="int64")
expected = pd.DataFrame(expected, columns=columns)
controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_file, columns=columns, storage_options=storage_options)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"
read_back_df["my_int"] = read_back_df["my_int"].astype("int64")
pd.testing.assert_frame_equal(read_back_df, expected)


@pytest.mark.skipif(not has_azure_credentials, reason="No azure credentials found")
Expand All @@ -319,10 +321,11 @@ def test_write_to_parquet_to_azure_with_indices(azure_setup_teardown_test):
controller.emit([i, f"this is {i}"], key=f"key{i}")
expected.append([f"key{i}", i, f"this is {i}"])
columns = ["event_key", "my_int", "my_string"]
expected = pd.DataFrame(expected, columns=columns, dtype="int64")
expected = pd.DataFrame(expected, columns=columns)
expected.set_index(["event_key"], inplace=True)
controller.terminate()
controller.await_termination()

read_back_df = pd.read_parquet(out_file, columns=columns, storage_options=storage_options)
assert read_back_df.equals(expected), f"{read_back_df}\n!=\n{expected}"
read_back_df["my_int"] = read_back_df["my_int"].astype("int64")
pd.testing.assert_frame_equal(read_back_df, expected)
10 changes: 5 additions & 5 deletions integration/test_s3_filesystem_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def test_write_csv_from_lists_with_metadata_and_column_pruning_to_s3(s3_teardown

@pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found")
def test_write_to_parquet_to_s3(s3_setup_teardown_test):
out_dir = f"s3://{s3_setup_teardown_test}/"
out_dir = f"s3://{s3_setup_teardown_test}"
columns = ["my_int", "my_string"]
controller = build_flow(
[
Expand All @@ -254,15 +254,15 @@ def test_write_to_parquet_to_s3(s3_setup_teardown_test):

@pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found")
def test_write_to_parquet_to_s3_single_file_on_termination(s3_setup_teardown_test):
out_file = f"s3://{s3_setup_teardown_test}/myfile.pq"
out_file = f"s3://{s3_setup_teardown_test}myfile.pq"
columns = ["my_int", "my_string"]
controller = build_flow([SyncEmitSource(), ParquetTarget(out_file, columns=columns)]).run()

expected = []
for i in range(10):
controller.emit([i, f"this is {i}"])
expected.append([i, f"this is {i}"])
expected = pd.DataFrame(expected, columns=columns, dtype="int64")
expected = pd.DataFrame(expected, columns=columns)
controller.terminate()
controller.await_termination()

Expand All @@ -272,7 +272,7 @@ def test_write_to_parquet_to_s3_single_file_on_termination(s3_setup_teardown_tes

@pytest.mark.skipif(not has_s3_credentials, reason="No s3 credentials found")
def test_write_to_parquet_to_s3_with_indices(s3_setup_teardown_test):
out_file = f"s3://{s3_setup_teardown_test}/test_write_to_parquet_with_indices{uuid.uuid4().hex}/"
out_file = f"s3://{s3_setup_teardown_test}test_write_to_parquet_with_indices{uuid.uuid4().hex}/"
controller = build_flow(
[
SyncEmitSource(),
Expand All @@ -285,7 +285,7 @@ def test_write_to_parquet_to_s3_with_indices(s3_setup_teardown_test):
controller.emit([i, f"this is {i}"], key=f"key{i}")
expected.append([f"key{i}", i, f"this is {i}"])
columns = ["event_key", "my_int", "my_string"]
expected = pd.DataFrame(expected, columns=columns, dtype="int64")
expected = pd.DataFrame(expected, columns=columns)
expected.set_index(["event_key"], inplace=True)
controller.terminate()
controller.await_termination()
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ numpy>=1.16.5,<1.27
# <15 is just a safeguard - no tests performed with pyarrow higher than 14
pyarrow>=1,<15
v3io-frames~=0.10.9
fsspec>=0.6.2
v3iofs~=0.1.17
xxhash>=1
nuclio-sdk>=0.5.3

0 comments on commit 5101670

Please sign in to comment.