Skip to content

Commit

Permalink
ParquetTarget: Ignore batching parameters in single file mode (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper authored Dec 20, 2023
1 parent 3677bc4 commit 63db8a5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
7 changes: 3 additions & 4 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,9 @@ def __init__(
else:
kwargs["partition_cols"] = partition_cols

if max_events is None and not self._single_file_mode:
max_events = 10000
if flush_after_seconds is None and not self._single_file_mode:
flush_after_seconds = 60
if self._single_file_mode:
max_events = None
flush_after_seconds = None

kwargs["path"] = path
if not self._single_file_mode and path.endswith("/"):
Expand Down
8 changes: 5 additions & 3 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2601,7 +2601,10 @@ def test_write_sparse_data_to_parquet(tmpdir):
def test_write_to_parquet_single_file_on_termination(tmpdir):
out_file = f"{tmpdir}/test_write_to_parquet_single_file_on_termination_{uuid.uuid4().hex}/out.parquet"
columns = ["my_int", "my_string"]
controller = build_flow([SyncEmitSource(), ParquetTarget(out_file, columns=columns)]).run()
# ML-5119 – make sure max_events and flush_after_seconds are ignored
controller = build_flow(
[SyncEmitSource(), ParquetTarget(out_file, columns=columns, max_events=1, flush_after_seconds=1)]
).run()

expected = []
for i in range(10):
Expand Down Expand Up @@ -3555,7 +3558,6 @@ def test_flow_to_dict_write_to_parquet():
"path": "outdir",
"columns": ["col1", "col2"],
"max_events": 2,
"flush_after_seconds": 60,
},
"name": "ParquetTarget",
}
Expand Down Expand Up @@ -3670,7 +3672,7 @@ def test_reader_writer_to_code():
reconstructed_code = flow.to_code()
print(reconstructed_code)
expected = """c_s_v_source0 = CSVSource(paths='mycsv.csv', header=True, build_dict=False, type_inference=True)
parquet_target0 = ParquetTarget(path='mypq', max_events=10000, flush_after_seconds=60)
parquet_target0 = ParquetTarget(path='mypq')
c_s_v_source0.to(parquet_target0)
"""
Expand Down

0 comments on commit 63db8a5

Please sign in to comment.