Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read_parquet_metadata to pylibcudf #17245

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ I/O Functions
csv
json
parquet
parquet_metadata
timezone
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
================
Parquet Metadata
================

.. automodule:: pylibcudf.io.parquet_metadata
:members:
1 change: 0 additions & 1 deletion python/cudf/cudf/_lib/io/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ from pylibcudf.libcudf.io.types cimport (
from cudf._lib.column cimport Column


cdef source_info make_source_info(list src) except*
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & data) except*
cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except*
Expand Down
56 changes: 0 additions & 56 deletions python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,20 @@ from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

from pylibcudf.io.datasource cimport Datasource
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.datasource cimport datasource
from pylibcudf.libcudf.io.types cimport (
column_name_info,
host_buffer,
sink_info,
source_info,
)

from cudf._lib.column cimport Column

import codecs
import errno
import io
import os

from cudf.core.dtypes import StructDtype


# Converts the Python source input to libcudf IO source_info
# with the appropriate type and source values
cdef source_info make_source_info(list src) except*:
if not src:
raise ValueError("Need to pass at least one source")

cdef const unsigned char[::1] c_buffer
cdef vector[host_buffer] c_host_buffers
cdef vector[string] c_files
cdef Datasource csrc
cdef vector[datasource*] c_datasources
empty_buffer = False
if isinstance(src[0], bytes):
empty_buffer = True
for buffer in src:
if (len(buffer) > 0):
c_buffer = buffer
c_host_buffers.push_back(host_buffer(<char*>&c_buffer[0],
c_buffer.shape[0]))
empty_buffer = False
elif isinstance(src[0], io.BytesIO):
for bio in src:
c_buffer = bio.getbuffer() # check if empty?
c_host_buffers.push_back(host_buffer(<char*>&c_buffer[0],
c_buffer.shape[0]))
# Otherwise src is expected to be a numeric fd, string path, or PathLike.
# TODO (ptaylor): Might need to update this check if accepted input types
# change when UCX and/or cuStreamz support is added.
elif isinstance(src[0], Datasource):
for csrc in src:
c_datasources.push_back(csrc.get_datasource())
return source_info(c_datasources)
elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)):
# If source is a file, return source_info where type=FILEPATH
if not all(os.path.isfile(file) for file in src):
raise FileNotFoundError(errno.ENOENT,
os.strerror(errno.ENOENT),
src)

files = [<string> str(elem).encode() for elem in src]
c_files = files
return source_info(c_files)
else:
raise TypeError("Unrecognized input type: {}".format(type(src[0])))

if empty_buffer is True:
c_host_buffers.push_back(host_buffer(<char*>NULL, 0))

return source_info(c_host_buffers)

# Converts the Python sink input to libcudf IO sink_info.
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & sink
Expand Down
67 changes: 22 additions & 45 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.utility cimport move
from libcpp.vector cimport vector

Expand All @@ -41,12 +40,7 @@ from pylibcudf.libcudf.io.parquet cimport (
parquet_writer_options,
write_parquet as parquet_writer,
)
from pylibcudf.libcudf.io.parquet_metadata cimport (
parquet_metadata,
read_parquet_metadata as parquet_metadata_reader,
)
from pylibcudf.libcudf.io.types cimport (
source_info,
sink_info,
column_in_metadata,
table_input_metadata,
Expand All @@ -62,7 +56,6 @@ from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport (
add_df_col_struct_names,
make_sinks_info,
make_source_info,
)
from cudf._lib.utils cimport table_view_from_table

Expand Down Expand Up @@ -373,7 +366,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
nrows=nrows, skip_rows=skip_rows)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
cpdef read_parquet_metadata(list filepaths_or_buffers):
"""
Cython function to call into libcudf API, see `read_parquet_metadata`.

Expand All @@ -382,56 +375,40 @@ cpdef read_parquet_metadata(filepaths_or_buffers):
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""
cdef source_info source = make_source_info(filepaths_or_buffers)

args = move(source)

cdef parquet_metadata c_result

# Read Parquet metadata
with nogil:
c_result = move(parquet_metadata_reader(args))

# access and return results
num_rows = c_result.num_rows()
num_rowgroups = c_result.num_rowgroups()

# extract row group metadata and sanitize keys
row_group_metadata = [{k.decode(): v for k, v in metadata}
for metadata in c_result.rowgroup_metadata()]
parquet_metadata = plc.io.parquet_metadata.read_parquet_metadata(
plc.io.SourceInfo(filepaths_or_buffers)
)

# read all column names including index column, if any
col_names = [info.name().decode() for info in c_result.schema().root().children()]

# access the Parquet file_footer to find the index
index_col = None
cdef unordered_map[string, string] file_footer = c_result.metadata()
col_names = [info.name() for info in parquet_metadata.schema().root().children()]

# get index column name(s)
index_col_names = None
json_str = file_footer[b'pandas'].decode('utf-8')
meta = None
index_col_names = set()
json_str = parquet_metadata.metadata()[b'pandas'].decode('utf-8')
if json_str != "":
meta = json.loads(json_str)
file_is_range_index, index_col, _ = _parse_metadata(meta)
if not file_is_range_index and index_col is not None \
and index_col_names is None:
index_col_names = {}
if (
not file_is_range_index
and index_col is not None
):
columns = meta['columns']
for idx_col in index_col:
for c in meta['columns']:
for c in columns:
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']
index_col_names.add(idx_col)

# remove the index column from the list of column names
# only if index_col_names is not None
if index_col_names is not None:
if len(index_col_names) >= 0:
col_names = [name for name in col_names if name not in index_col_names]

# num_columns = length of list(col_names)
num_columns = len(col_names)

# return the metadata
return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata
return (
parquet_metadata.num_rows(),
parquet_metadata.num_rowgroups(),
col_names,
len(col_names),
parquet_metadata.rowgroup_metadata()
)


@acquire_spill_lock()
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,14 @@ def test_parquet_range_index_pandas_metadata(tmpdir, pandas_compat, as_bytes):
assert_eq(expect, got)


def test_parquet_read_metadata(tmpdir, pdf):
def test_parquet_read_metadata(tmp_path, pdf):
if len(pdf) > 100:
pytest.skip("Skipping long setup test")

def num_row_groups(rows, group_size):
return max(1, (rows + (group_size - 1)) // group_size)

fname = tmpdir.join("metadata.parquet")
fname = tmp_path / "metadata.parquet"
row_group_size = 5
pdf.to_parquet(fname, compression="snappy", row_group_size=row_group_size)

Expand Down
4 changes: 2 additions & 2 deletions python/pylibcudf/pylibcudf/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# the License.
# =============================================================================

set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx timezone.pyx
types.pyx
set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx
parquet_metadata.pyx timezone.pyx types.pyx
)

set(linked_libraries cudf::cudf)
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/io/__init__.pxd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

# CSV is removed since it is def not cpdef (to force kw-only arguments)
from . cimport avro, datasource, json, orc, parquet, timezone, types
from . cimport avro, datasource, json, orc, parquet, parquet_metadata, timezone, types
from .types cimport SourceInfo, TableWithMetadata
12 changes: 11 additions & 1 deletion python/pylibcudf/pylibcudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from . import avro, csv, datasource, json, orc, parquet, timezone, types
from . import (
avro,
csv,
datasource,
json,
orc,
parquet,
parquet_metadata,
timezone,
types,
)
from .types import SinkInfo, SourceInfo, TableWithMetadata
51 changes: 51 additions & 0 deletions python/pylibcudf/pylibcudf/io/parquet_metadata.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from pylibcudf.io.types cimport SourceInfo
from pylibcudf.libcudf.io.parquet_metadata cimport(
parquet_metadata,
parquet_schema,
parquet_column_schema,
)

cdef class ParquetColumnSchema:
cdef parquet_column_schema column_schema

@staticmethod
cdef from_column_schema(parquet_column_schema column_schema)

cpdef str name(self)

cpdef int num_children(self)

cpdef ParquetColumnSchema child(self, int idx)

cpdef list children(self)


cdef class ParquetSchema:
cdef parquet_schema schema

@staticmethod
cdef from_schema(parquet_schema schema)

cpdef ParquetColumnSchema root(self)


cdef class ParquetMetadata:
cdef parquet_metadata meta

@staticmethod
cdef from_metadata(parquet_metadata meta)

cpdef ParquetSchema schema(self)

cpdef int num_rows(self)

cpdef int num_rowgroups(self)

cpdef dict metadata(self)

cpdef list rowgroup_metadata(self)


cpdef ParquetMetadata read_parquet_metadata(SourceInfo src_info)
Loading
Loading