From 5a6d177b259bcda647a393bc1df63f06bb26b56f Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 30 Oct 2024 14:49:50 -0500 Subject: [PATCH] Fix ``to_parquet`` append behavior with global metadata file (#17198) Closes https://github.com/rapidsai/cudf/issues/17177 When appending to a parquet dataset with Dask cuDF, the original metadata must be converted from `pq.FileMetaData` to `bytes` before it can be passed down to `cudf.io.merge_parquet_filemetadata`. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/cudf/pull/17198 --- python/dask_cudf/dask_cudf/io/parquet.py | 6 ++++++ .../dask_cudf/io/tests/test_parquet.py | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index a781b8242fe..39ac6474958 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -383,6 +383,12 @@ def write_metadata(parts, fmd, fs, path, append=False, **kwargs): metadata_path = fs.sep.join([path, "_metadata"]) _meta = [] if append and fmd is not None: + # Convert to bytes: + if isinstance(fmd, pq.FileMetaData): + with BytesIO() as myio: + fmd.write_metadata_file(myio) + myio.seek(0) + fmd = np.frombuffer(myio.read(), dtype="uint8") _meta = [fmd] _meta.extend([parts[i][0]["meta"] for i in range(len(parts))]) _meta = ( diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index ae5ca480e31..a29cf9a342a 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -644,3 +644,23 @@ def test_read_parquet_arrow_filesystem(tmpdir, min_part_size): dd.assert_eq(df, ddf, check_index=False) assert isinstance(ddf._meta, cudf.DataFrame) assert isinstance(ddf.compute(), cudf.DataFrame) + + +@pytest.mark.parametrize("write_metadata_file", [True, False]) +def test_to_parquet_append(tmpdir, write_metadata_file): + df = cudf.DataFrame({"a": [1, 2, 3]}) + ddf = dask_cudf.from_cudf(df, npartitions=1) + ddf.to_parquet( + tmpdir, + append=True, + write_metadata_file=write_metadata_file, + write_index=False, + ) + ddf.to_parquet( + tmpdir, + append=True, + write_metadata_file=write_metadata_file, + write_index=False, + ) + ddf2 = dask_cudf.read_parquet(tmpdir) + dd.assert_eq(cudf.concat([df, df]), ddf2)