Skip to content

Commit

Permalink
Fix to_parquet append behavior with global metadata file (#17198)
Browse files Browse the repository at this point in the history
Closes #17177

When appending to a parquet dataset with Dask cuDF, the original metadata must be converted from `pq.FileMetaData` to `bytes` before it can be passed down to `cudf.io.merge_parquet_filemetadata`.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #17198
  • Loading branch information
rjzamora authored Oct 30, 2024
1 parent 7157de7 commit 5a6d177
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ def write_metadata(parts, fmd, fs, path, append=False, **kwargs):
metadata_path = fs.sep.join([path, "_metadata"])
_meta = []
if append and fmd is not None:
# Convert to bytes: <https://github.com/rapidsai/cudf/issues/17177>
if isinstance(fmd, pq.FileMetaData):
with BytesIO() as myio:
fmd.write_metadata_file(myio)
myio.seek(0)
fmd = np.frombuffer(myio.read(), dtype="uint8")
_meta = [fmd]
_meta.extend([parts[i][0]["meta"] for i in range(len(parts))])
_meta = (
Expand Down
20 changes: 20 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,3 +644,23 @@ def test_read_parquet_arrow_filesystem(tmpdir, min_part_size):
dd.assert_eq(df, ddf, check_index=False)
assert isinstance(ddf._meta, cudf.DataFrame)
assert isinstance(ddf.compute(), cudf.DataFrame)


@pytest.mark.parametrize("write_metadata_file", [True, False])
def test_to_parquet_append(tmpdir, write_metadata_file):
df = cudf.DataFrame({"a": [1, 2, 3]})
ddf = dask_cudf.from_cudf(df, npartitions=1)
ddf.to_parquet(
tmpdir,
append=True,
write_metadata_file=write_metadata_file,
write_index=False,
)
ddf.to_parquet(
tmpdir,
append=True,
write_metadata_file=write_metadata_file,
write_index=False,
)
ddf2 = dask_cudf.read_parquet(tmpdir)
dd.assert_eq(cudf.concat([df, df]), ddf2)

0 comments on commit 5a6d177

Please sign in to comment.