diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index f0eef9be124..f0f32831d07 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -26,6 +26,7 @@ from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr +from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move @@ -44,6 +45,7 @@ from cudf._lib.io.utils cimport ( ) from cudf._lib.pylibcudf.libcudf.expressions cimport expression from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( + chunked_parquet_reader as cpp_chunked_parquet_reader, chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, @@ -60,6 +62,7 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport ( from cudf._lib.pylibcudf.libcudf.io.types cimport ( column_in_metadata, table_input_metadata, + table_metadata, ) from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type @@ -126,50 +129,22 @@ def _parse_metadata(meta): return file_is_range_index, file_index_cols, file_column_dtype -cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, - use_pandas_metadata=True, - Expression filters=None): - """ - Cython function to call into libcudf API, see `read_parquet`. - - filters, if not None, should be an Expression that evaluates to a - boolean predicate as a function of columns being read. - - See Also - -------- - cudf.io.parquet.read_parquet - cudf.io.parquet.to_parquet - """ - - # Convert NativeFile buffers to NativeFileDatasource, - # but save original buffers in case we need to use - # pyarrow for metadata processing - # (See: https://github.com/rapidsai/cudf/issues/9599) - pa_buffers = [] - for i, datasource in enumerate(filepaths_or_buffers): - if isinstance(datasource, NativeFile): - pa_buffers.append(datasource) - filepaths_or_buffers[i] = NativeFileDatasource(datasource) +cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( + cudf_io_types.source_info source, + vector[vector[size_type]] row_groups, + bool use_pandas_metadata, + Expression filters, + object columns): - cdef cudf_io_types.source_info source = make_source_info( - filepaths_or_buffers) - - cdef bool cpp_use_pandas_metadata = use_pandas_metadata - - cdef vector[vector[size_type]] cpp_row_groups + cdef parquet_reader_options args + cdef parquet_reader_options_builder builder cdef data_type cpp_timestamp_type = cudf_types.data_type( cudf_types.type_id.EMPTY ) - if row_groups is not None: - cpp_row_groups = row_groups - - # Setup parquet reader arguments - cdef parquet_reader_options args - cdef parquet_reader_options_builder builder builder = ( parquet_reader_options.builder(source) - .row_groups(cpp_row_groups) - .use_pandas_metadata(cpp_use_pandas_metadata) + .row_groups(row_groups) + .use_pandas_metadata(use_pandas_metadata) .use_arrow_schema(True) .timestamp_type(cpp_timestamp_type) ) @@ -185,28 +160,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, for col in columns: cpp_columns.push_back(str(col).encode()) args.set_columns(cpp_columns) - # Filters don't handle the range index correctly allow_range_index &= filters is None - # Read Parquet - cdef cudf_io_types.table_with_metadata c_result - - with nogil: - c_result = move(parquet_reader(args)) - - names = [info.name.decode() for info in c_result.metadata.schema_info] - - # Access the Parquet per_file_user_data to find the index + return pair[parquet_reader_options, bool](args, allow_range_index) + +cdef object _process_metadata(object df, + table_metadata table_meta, + list names, + object row_groups, + object filepaths_or_buffers, + list pa_buffers, + bool allow_range_index, + bool use_pandas_metadata): + update_struct_field_names(df, table_meta.schema_info) index_col = None - cdef vector[unordered_map[string, string]] per_file_user_data = \ - c_result.metadata.per_file_user_data - + is_range_index = True column_index_type = None index_col_names = None - is_range_index = True + meta = None + cdef vector[unordered_map[string, string]] per_file_user_data = \ + table_meta.per_file_user_data for single_file in per_file_user_data: json_str = single_file[b'pandas'].decode('utf-8') - meta = None if json_str != "": meta = json.loads(json_str) file_is_range_index, index_col, column_index_type = _parse_metadata(meta) @@ -220,13 +195,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if c['field_name'] == idx_col: index_col_names[idx_col] = c['name'] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=names - )) - - update_struct_field_names(df, c_result.metadata.schema_info) - if meta is not None: # Book keep each column metadata as the order # of `meta["columns"]` and `column_names` are not @@ -319,9 +287,65 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if use_pandas_metadata: df.index.names = index_col - # Set column dtype for empty types. if len(df._data.names) == 0 and column_index_type is not None: df._data.label_dtype = cudf.dtype(column_index_type) + + return df + + +cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + Expression filters=None): + """ + Cython function to call into libcudf API, see `read_parquet`. + + filters, if not None, should be an Expression that evaluates to a + boolean predicate as a function of columns being read. + + See Also + -------- + cudf.io.parquet.read_parquet + cudf.io.parquet.to_parquet + """ + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + cdef vector[vector[size_type]] cpp_row_groups + if row_groups is not None: + cpp_row_groups = row_groups + + # Setup parquet reader arguments + cdef parquet_reader_options args + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, filters, columns) + args, allow_range_index = c_res.first, c_res.second + + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(parquet_reader(args)) + + names = [info.name.decode() for info in c_result.metadata.schema_info] + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=names + )) + df = _process_metadata(df, c_result.metadata, names, row_groups, + filepaths_or_buffers, pa_buffers, + allow_range_index, use_pandas_metadata) return df cpdef read_parquet_metadata(filepaths_or_buffers): @@ -767,6 +791,102 @@ cdef class ParquetWriter: self.initialized = True +cdef class ParquetReader: + cdef bool initialized + cdef unique_ptr[cpp_chunked_parquet_reader] reader + cdef size_t chunk_read_limit + cdef size_t pass_read_limit + cdef size_t row_group_size_bytes + cdef table_metadata result_meta + cdef vector[unordered_map[string, string]] per_file_user_data + cdef object pandas_meta + cdef list pa_buffers + cdef bool allow_range_index + cdef object row_groups + cdef object filepaths_or_buffers + cdef object names + cdef object column_index_type + cdef object index_col_names + cdef bool is_range_index + cdef object index_col + cdef bool cpp_use_pandas_metadata + + def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + size_t chunk_read_limit=0, + size_t pass_read_limit=1024000000): + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + self.pa_buffers = pa_buffers + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + self.cpp_use_pandas_metadata = use_pandas_metadata + + cdef vector[vector[size_type]] cpp_row_groups + if row_groups is not None: + cpp_row_groups = row_groups + cdef parquet_reader_options args + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, None, columns) + args, self.allow_range_index = c_res.first, c_res.second + + with nogil: + self.reader.reset( + new cpp_chunked_parquet_reader( + chunk_read_limit, + pass_read_limit, + args + ) + ) + self.initialized = False + self.row_groups = row_groups + self.filepaths_or_buffers = filepaths_or_buffers + + def _has_next(self): + cdef bool res + with nogil: + res = self.reader.get()[0].has_next() + return res + + def _read_chunk(self): + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(self.reader.get()[0].read_chunk()) + + if not self.initialized: + self.names = [info.name.decode() for info in c_result.metadata.schema_info] + self.result_meta = c_result.metadata + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=self.names, + )) + + self.initialized = True + return df + + def read(self): + dfs = [] + while self._has_next(): + dfs.append(self._read_chunk()) + df = cudf.concat(dfs) + df = _process_metadata(df, self.result_meta, self.names, self.row_groups, + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) + return df + cpdef merge_filemetadata(object filemetadata_list): """ Cython function to call into libcudf API, see `merge_row_group_metadata`. diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 33a594b432f..fb98650308a 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -283,6 +283,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: vector[string] column_chunks_file_paths, ) except + + cdef cppclass chunked_parquet_reader: + chunked_parquet_reader() except + + chunked_parquet_reader( + size_t chunk_read_limit, + const parquet_reader_options& options) except + + chunked_parquet_reader( + size_t chunk_read_limit, + size_t pass_read_limit, + const parquet_reader_options& options) except + + bool has_next() except + + cudf_io_types.table_with_metadata read_chunk() except + + cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( const vector[unique_ptr[vector[uint8_t]]]& metadata_list ) except + diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e32fdacd8d6..2596fe8cd37 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -22,6 +22,7 @@ from pyarrow import fs as pa_fs, parquet as pq import cudf +from cudf._lib.parquet import ParquetReader from cudf.io.parquet import ( ParquetDatasetWriter, ParquetWriter, @@ -3407,3 +3408,29 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): # Check results assert_eq(expected, got) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("use_pandas_metadata", [True, False]) +@pytest.mark.parametrize("row_groups", [[[0]], None, [[0, 1]]]) +def test_parquet_chunked_reader( + chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups +): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + reader = ParquetReader( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + use_pandas_metadata=use_pandas_metadata, + row_groups=row_groups, + ) + expected = cudf.read_parquet( + buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups + ) + actual = reader.read() + assert_eq(expected, actual)