diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 3f2e1fa5e6c..074f690d2c7 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -91,7 +91,8 @@ class reader { class chunked_reader : private reader { public: /** - * @brief Constructor from a read size limit and an array of data sources with reader options. + * @brief Constructor from an output size memory limit and an input size memory limit and an array + * of data sources with reader options. * * The typical usage should be similar to this: * ``` @@ -102,17 +103,45 @@ class chunked_reader : private reader { * * ``` * - * If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the - * whole file and return a table containing all rows. + * If `chunk_read_limit == 0` (i.e., no output limit), and `pass_read_limit == 0` (no input + * temporary memory size limit) a call to `read_chunk()` will read the whole file and return a + * table containing all rows. + * + * The chunk_read_limit parameter controls the size of the output chunks produces. If the user + * specifies 100 MB of data, the reader will attempt to return chunks containing tables that have + * a total bytes size (over all columns) of 100 MB or less. This is a soft limit and the code + * will not fail if it cannot satisfy the limit. It will make a best-effort atttempt only. + * + * The pass_read_limit parameter controls how much temporary memory is used in the process of + * decoding the file. The primary contributor to this memory usage is the uncompressed size of + * the data read out of the file and the decompressed (but not yet decoded) size of the data. The + * granularity of a given pass is at the row group level. It will not attempt to read at the sub + * row-group level. + * + * Combined, the way to visualize passes and chunks is as follows: + * + * @code{.pseudo} + * for(each pass){ + * for(each output chunk within a pass){ + * return a table that fits within the output chunk limit + * } + * } + * @endcode + * + * With a pass_read_limit of `0` you are simply saying you have one pass that reads the entire + * file as normal. * * @param chunk_read_limit Limit on total number of bytes to be returned per read, - * or `0` if there is no limit + * or `0` if there is no limit + * @param pass_read_limit Limit on total amount of memory used for temporary computations during + * loading, or `0` if there is no limit * @param sources Input `datasource` objects to read the dataset from * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit chunked_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 788ff15f3c1..deaf23d405a 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -445,6 +445,30 @@ class chunked_parquet_reader { parquet_reader_options const& options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** + * @brief Constructor for chunked reader. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the + * output table for each reading, and a byte limit on the amount of temporary memory to use + * when reading. pass_read_limit affects how many row groups we can read at a time by limiting + * the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an + * absolute limit - if a single row group cannot fit within the limit given, it will still be + * loaded. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or + * `0` if there is no limit + * @param options The options used to read Parquet file + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + std::size_t pass_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Destructor, destroying the internal reader instance. * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 45f8b0f8822..392a7850886 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -562,6 +562,23 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, parquet_reader_options const& options, rmm::mr::device_memory_resource* mr) : reader{std::make_unique(chunk_read_limit, + 0, + make_datasources(options.get_source()), + options, + cudf::get_default_stream(), + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr) + : reader{std::make_unique(chunk_read_limit, + pass_read_limit, make_datasources(options.get_source()), options, cudf::get_default_stream(), diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index a760c2448dc..572fe5c2820 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -306,33 +306,74 @@ struct ColumnChunkDesc { }; /** - * @brief Struct to store raw/intermediate file data before parsing. + * @brief The row_group_info class + */ +struct row_group_info { + size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is + // called with index and source_index + size_t start_row; + size_type source_index; // file index. + + row_group_info() = default; + + row_group_info(size_type index, size_t start_row, size_type source_index) + : index{index}, start_row{start_row}, source_index{source_index} + { + } +}; + +/** + * @brief Struct to store file-level data that remains constant for + * all passes/chunks for the file. */ struct file_intermediate_data { + // all row groups to read + std::vector row_groups{}; + + // all chunks from the selected row groups. We may end up reading these chunks progressively + // instead of all at once + std::vector chunks{}; + + // skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we + // may not be visiting every row group that contains these bounds + size_t global_skip_rows; + size_t global_num_rows; +}; + +/** + * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. + */ +struct chunk_read_info { + size_t skip_rows; + size_t num_rows; +}; + +/** + * @brief Struct to store pass-level data that remains constant for a single pass. + */ +struct pass_intermediate_data { std::vector> raw_page_data; rmm::device_buffer decomp_page_data; + + // rowgroup, chunk and page information for the current pass. + std::vector row_groups{}; cudf::detail::hostdevice_vector chunks{}; cudf::detail::hostdevice_vector pages_info{}; cudf::detail::hostdevice_vector page_nesting_info{}; cudf::detail::hostdevice_vector page_nesting_decode_info{}; - rmm::device_buffer level_decode_data; - int level_type_size; -}; - -/** - * @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading. - */ -struct chunk_intermediate_data { rmm::device_uvector page_keys{0, rmm::cuda_stream_default}; rmm::device_uvector page_index{0, rmm::cuda_stream_default}; rmm::device_uvector str_dict_index{0, rmm::cuda_stream_default}; -}; -/** - * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. - */ -struct chunk_read_info { + std::vector output_chunk_read_info; + std::size_t current_output_chunk{0}; + + rmm::device_buffer level_decode_data{}; + int level_type_size{0}; + + // skip_rows and num_rows values for this particular pass. these may be adjusted values from the + // global values stored in file_intermediate_data. size_t skip_rows; size_t num_rows; }; diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 7365c102d8f..1e87447006d 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -43,12 +43,14 @@ table_with_metadata reader::read(parquet_reader_options const& options) } chunked_reader::chunked_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - _impl = std::make_unique(chunk_read_limit, std::move(sources), options, stream, mr); + _impl = std::make_unique( + chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr); } chunked_reader::~chunked_reader() = default; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8b0a0bd4eb0..c7f0d20ba9c 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -29,10 +29,10 @@ namespace cudf::io::detail::parquet { void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting = _file_itm_data.page_nesting_info; - auto& page_nesting_decode = _file_itm_data.page_nesting_decode_info; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto& page_nesting = _pass_itm_data->page_nesting_info; + auto& page_nesting_decode = _pass_itm_data->page_nesting_decode_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -55,7 +55,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) std::vector col_sizes(_input_columns.size(), 0L); if (has_strings) { gpu::ComputePageStringSizes( - pages, chunks, skip_rows, num_rows, _file_itm_data.level_type_size, _stream); + pages, chunks, skip_rows, num_rows, _pass_itm_data->level_type_size, _stream); col_sizes = calculate_page_string_offsets(); @@ -167,7 +167,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); - auto const level_type_size = _file_itm_data.level_type_size; + auto const level_type_size = _pass_itm_data->level_type_size; // launch string decoder int s_idx = 0; @@ -266,6 +266,7 @@ reader::impl::impl(std::vector>&& sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : impl(0 /*chunk_read_limit*/, + 0 /*input_pass_read_limit*/, std::forward>>(sources), options, stream, @@ -274,11 +275,16 @@ reader::impl::impl(std::vector>&& sources, } reader::impl::impl(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _stream{stream}, _mr{mr}, _sources{std::move(sources)}, _chunk_read_limit{chunk_read_limit} + : _stream{stream}, + _mr{mr}, + _sources{std::move(sources)}, + _output_chunk_read_limit{chunk_read_limit}, + _input_pass_read_limit{pass_read_limit} { // Open and parse the source dataset metadata _metadata = std::make_unique(_sources); @@ -302,11 +308,8 @@ reader::impl::impl(std::size_t chunk_read_limit, _timestamp_type.id()); // Save the states of the output buffers for reuse in `chunk_read()`. - // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { - for (auto const& buff : _output_buffers) { - _output_buffers_template.emplace_back(inline_column_buffer::empty_like(buff)); - } + for (auto const& buff : _output_buffers) { + _output_buffers_template.emplace_back(inline_column_buffer::empty_like(buff)); } } @@ -316,32 +319,62 @@ void reader::impl::prepare_data(int64_t skip_rows, host_span const> row_group_indices, std::optional> filter) { - if (_file_preprocessed) { return; } + // if we have not preprocessed at the whole-file level, do that now + if (!_file_preprocessed) { + // if filter is not empty, then create output types as vector and pass for filtering. + std::vector output_types; + if (filter.has_value()) { + std::transform(_output_buffers.cbegin(), + _output_buffers.cend(), + std::back_inserter(output_types), + [](auto const& col) { return col.type; }); + } + std::tie( + _file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) = + _metadata->select_row_groups( + row_group_indices, skip_rows, num_rows, output_types, filter, _stream); + + if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && + not _input_columns.empty()) { + // fills in chunk information without physically loading or decompressing + // the associated data + load_global_chunk_info(); + + // compute schedule of input reads. Each rowgroup contains 1 chunk per column. For now + // we will read an entire row group at a time. However, it is possible to do + // sub-rowgroup reads if we made some estimates on individual chunk sizes (tricky) and + // changed the high level structure such that we weren't always reading an entire table's + // worth of columns at once. + compute_input_pass_row_group_info(); + } - // if filter is not empty, then create output types as vector and pass for filtering. - std::vector output_types; - if (filter.has_value()) { - std::transform(_output_buffers.cbegin(), - _output_buffers.cend(), - std::back_inserter(output_types), - [](auto const& col) { return col.type; }); + _file_preprocessed = true; } - auto const [skip_rows_corrected, num_rows_corrected, row_groups_info] = - _metadata->select_row_groups( - row_group_indices, skip_rows, num_rows, output_types, filter, _stream); - - if (num_rows_corrected > 0 && not row_groups_info.empty() && not _input_columns.empty()) { - load_and_decompress_data(row_groups_info, num_rows_corrected); - preprocess_pages( - skip_rows_corrected, num_rows_corrected, uses_custom_row_bounds, _chunk_read_limit); - - if (_chunk_read_limit == 0) { // read the whole file at once - CUDF_EXPECTS(_chunk_read_info.size() == 1, - "Reading the whole file should yield only one chunk."); + + // if we have to start a new pass, do that now + if (!_pass_preprocessed) { + auto const num_passes = _input_pass_row_group_offsets.size() - 1; + + // always create the pass struct, even if we end up with no passes. + // this will also cause the previous pass information to be deleted + _pass_itm_data = std::make_unique(); + + if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && + not _input_columns.empty() && _current_input_pass < num_passes) { + // setup the pass_intermediate_info for this pass. + setup_pass(); + + load_and_decompress_data(); + preprocess_pages(uses_custom_row_bounds, _output_chunk_read_limit); + + if (_output_chunk_read_limit == 0) { // read the whole file at once + CUDF_EXPECTS(_pass_itm_data->output_chunk_read_info.size() == 1, + "Reading the whole file should yield only one chunk."); + } } - } - _file_preprocessed = true; + _pass_preprocessed = true; + } } void reader::impl::populate_metadata(table_metadata& out_metadata) @@ -371,11 +404,12 @@ table_with_metadata reader::impl::read_chunk_internal( auto out_columns = std::vector>{}; out_columns.reserve(_output_buffers.size()); - if (!has_next() || _chunk_read_info.empty()) { + if (!has_next() || _pass_itm_data->output_chunk_read_info.empty()) { return finalize_output(out_metadata, out_columns, filter); } - auto const& read_info = _chunk_read_info[_current_read_chunk++]; + auto const& read_info = + _pass_itm_data->output_chunk_read_info[_pass_itm_data->current_output_chunk]; // Allocate memory buffers for the output columns. allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); @@ -428,6 +462,17 @@ table_with_metadata reader::impl::finalize_output( _output_metadata = std::make_unique(out_metadata); } + // advance chunks/passes as necessary + _pass_itm_data->current_output_chunk++; + _chunk_count++; + if (_pass_itm_data->current_output_chunk >= _pass_itm_data->output_chunk_read_info.size()) { + _pass_itm_data->current_output_chunk = 0; + _pass_itm_data->output_chunk_read_info.clear(); + + _current_input_pass++; + _pass_preprocessed = false; + } + if (filter.has_value()) { auto read_table = std::make_unique(std::move(out_columns)); auto predicate = cudf::detail::compute_column( @@ -447,7 +492,8 @@ table_with_metadata reader::impl::read( host_span const> row_group_indices, std::optional> filter) { - CUDF_EXPECTS(_chunk_read_limit == 0, "Reading the whole file must not have non-zero byte_limit."); + CUDF_EXPECTS(_output_chunk_read_limit == 0, + "Reading the whole file must not have non-zero byte_limit."); table_metadata metadata; populate_metadata(metadata); auto expr_conv = named_to_reference_converter(filter, metadata); @@ -461,7 +507,7 @@ table_with_metadata reader::impl::read_chunk() { // Reset the output buffers to their original states (right after reader construction). // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { + if (_chunk_count > 0) { _output_buffers.resize(0); for (auto const& buff : _output_buffers_template) { _output_buffers.emplace_back(inline_column_buffer::empty_like(buff)); @@ -483,7 +529,11 @@ bool reader::impl::has_next() true /*uses_custom_row_bounds*/, {} /*row_group_indices, empty means read all row groups*/, std::nullopt /*filter*/); - return _current_read_chunk < _chunk_read_info.size(); + + auto const num_input_passes = + _input_pass_row_group_offsets.size() == 0 ? 0 : _input_pass_row_group_offsets.size() - 1; + return (_pass_itm_data->current_output_chunk < _pass_itm_data->output_chunk_read_info.size()) || + (_current_input_pass < num_input_passes); } namespace { diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index a980670e465..9445e4d1648 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -90,17 +90,20 @@ class reader::impl { * ``` * * Reading the whole given file at once through `read()` function is still supported if - * `chunk_read_limit == 0` (i.e., no reading limit). - * In such case, `read_chunk()` will also return rows of the entire file. + * `chunk_read_limit == 0` (i.e., no reading limit) and `pass_read_limit == 0` (no temporary + * memory limit) In such case, `read_chunk()` will also return rows of the entire file. * * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit + * @param pass_read_limit Limit on memory usage for the purposes of decompression and processing + * of input, or `0` if there is no limit. * @param sources Dataset sources * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit impl(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, @@ -133,22 +136,22 @@ class reader::impl { host_span const> row_group_indices, std::optional> filter); + void load_global_chunk_info(); + void compute_input_pass_row_group_info(); + void setup_pass(); + /** * @brief Create chunk information and start file reads * - * @param row_groups_info vector of information about row groups to read - * @param num_rows Maximum number of rows to read * @return pair of boolean indicating if compressed chunks were found and a vector of futures for * read completion */ - std::pair>> create_and_read_column_chunks( - cudf::host_span const row_groups_info, size_type num_rows); + std::pair>> read_and_decompress_column_chunks(); /** * @brief Load and decompress the input file(s) into memory. */ - void load_and_decompress_data(cudf::host_span const row_groups_info, - size_type num_rows); + void load_and_decompress_data(); /** * @brief Perform some preprocessing for page data and also compute the split locations @@ -161,17 +164,12 @@ class reader::impl { * * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders). * - * @param skip_rows Crop all rows below skip_rows - * @param num_rows Maximum number of rows to read * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific * bounds * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit */ - void preprocess_pages(size_t skip_rows, - size_t num_rows, - bool uses_custom_row_bounds, - size_t chunk_read_limit); + void preprocess_pages(bool uses_custom_row_bounds, size_t chunk_read_limit); /** * @brief Allocate nesting information storage for all pages and set pointers to it. @@ -278,12 +276,28 @@ class reader::impl { std::optional> _reader_column_schema; data_type _timestamp_type{type_id::EMPTY}; - // Variables used for chunked reading: + // chunked reading happens in 2 parts: + // + // At the top level there is the "pass" in which we try and limit the + // total amount of temporary memory (compressed data, decompressed data) in use + // via _input_pass_read_limit. + // + // Within a pass, we produce one or more chunks of output, whose maximum total + // byte size is controlled by _output_chunk_read_limit. + cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; - cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data; - std::vector _chunk_read_info; - std::size_t _chunk_read_limit{0}; - std::size_t _current_read_chunk{0}; + std::unique_ptr _pass_itm_data; + + // an array of offsets into _file_itm_data::global_chunks. Each pair of offsets represents + // the start/end of the chunks to be loaded for a given pass. + std::vector _input_pass_row_group_offsets{}; + std::vector _input_pass_row_count{}; + std::size_t _current_input_pass{0}; + std::size_t _chunk_count{0}; + + std::size_t _output_chunk_read_limit{0}; + std::size_t _input_pass_read_limit{0}; + bool _pass_preprocessed{false}; bool _file_preprocessed{false}; }; diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index f6dbeb275fc..fcaa610fbb7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -344,7 +344,7 @@ std::vector aggregate_reader_metadata::get_pandas_index_names() con return names; } -std::tuple> +std::tuple> aggregate_reader_metadata::select_row_groups( host_span const> row_group_indices, int64_t skip_rows_opt, @@ -362,7 +362,7 @@ aggregate_reader_metadata::select_row_groups( host_span const>(filtered_row_group_indices.value()); } } - std::vector selection; + std::vector selection; auto [rows_to_skip, rows_to_read] = [&]() { if (not row_group_indices.empty()) { return std::pair{}; } auto const from_opts = cudf::io::detail::skip_rows_num_rows_from_options( diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 751ffc33123..61e4f94df0f 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -53,19 +53,6 @@ using namespace cudf::io::parquet; : data_type{t_id}; } -/** - * @brief The row_group_info class - */ -struct row_group_info { - size_type const index; - size_t const start_row; // TODO source index - size_type const source_index; - row_group_info(size_type index, size_t start_row, size_type source_index) - : index(index), start_row(start_row), source_index(source_index) - { - } -}; - /** * @brief Class for parsing dataset metadata */ @@ -194,7 +181,7 @@ class aggregate_reader_metadata { * @return A tuple of corrected row_start, row_count and list of row group indexes and its * starting row */ - [[nodiscard]] std::tuple> select_row_groups( + [[nodiscard]] std::tuple> select_row_groups( host_span const> row_group_indices, int64_t row_start, std::optional const& row_count, diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index a2db0de26bb..c731c467f2c 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -577,10 +577,10 @@ int decode_page_headers(cudf::detail::hostdevice_vector& c void reader::impl::allocate_nesting_info() { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting_info = _file_itm_data.page_nesting_info; - auto& page_nesting_decode_info = _file_itm_data.page_nesting_decode_info; + auto const& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto& page_nesting_info = _pass_itm_data->page_nesting_info; + auto& page_nesting_decode_info = _pass_itm_data->page_nesting_decode_info; // compute total # of page_nesting infos needed and allocate space. doing this in one // buffer to keep it to a single gpu allocation @@ -702,38 +702,39 @@ void reader::impl::allocate_nesting_info() void reader::impl::allocate_level_decode_space() { - auto& pages = _file_itm_data.pages_info; + auto& pages = _pass_itm_data->pages_info; // TODO: this could be made smaller if we ignored dictionary pages and pages with no // repetition data. size_t const per_page_decode_buf_size = - LEVEL_DECODE_BUF_SIZE * 2 * _file_itm_data.level_type_size; + LEVEL_DECODE_BUF_SIZE * 2 * _pass_itm_data->level_type_size; auto const decode_buf_size = per_page_decode_buf_size * pages.size(); - _file_itm_data.level_decode_data = + _pass_itm_data->level_decode_data = rmm::device_buffer(decode_buf_size, _stream, rmm::mr::get_current_device_resource()); // distribute the buffers - uint8_t* buf = static_cast(_file_itm_data.level_decode_data.data()); + uint8_t* buf = static_cast(_pass_itm_data->level_decode_data.data()); for (size_t idx = 0; idx < pages.size(); idx++) { auto& p = pages[idx]; p.lvl_decode_buf[gpu::level_type::DEFINITION] = buf; - buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size); + buf += (LEVEL_DECODE_BUF_SIZE * _pass_itm_data->level_type_size); p.lvl_decode_buf[gpu::level_type::REPETITION] = buf; - buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size); + buf += (LEVEL_DECODE_BUF_SIZE * _pass_itm_data->level_type_size); } } -std::pair>> reader::impl::create_and_read_column_chunks( - cudf::host_span const row_groups_info, size_type num_rows) +std::pair>> reader::impl::read_and_decompress_column_chunks() { - auto& raw_page_data = _file_itm_data.raw_page_data; - auto& chunks = _file_itm_data.chunks; + auto const& row_groups_info = _pass_itm_data->row_groups; + auto const num_rows = _pass_itm_data->num_rows; + + auto& raw_page_data = _pass_itm_data->raw_page_data; + auto& chunks = _pass_itm_data->chunks; // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = _input_columns.size(); auto const num_chunks = row_groups_info.size() * num_input_columns; - chunks = cudf::detail::hostdevice_vector(0, num_chunks, _stream); // Association between each column chunk and its source std::vector chunk_source_map(num_chunks); @@ -747,13 +748,68 @@ std::pair>> reader::impl::create_and_read_co // Initialize column chunk information size_t total_decompressed_size = 0; auto remaining_rows = num_rows; - std::vector> read_rowgroup_tasks; + std::vector> read_chunk_tasks; + size_type chunk_count = 0; for (auto const& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); - auto const row_group_start = rg.start_row; auto const row_group_source = rg.source_index; auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); + // generate ColumnChunkDesc objects for everything to be decoded (all input columns) + for (size_t i = 0; i < num_input_columns; ++i) { + auto const& col = _input_columns[i]; + // look up metadata + auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); + + column_chunk_offsets[chunk_count] = + (col_meta.dictionary_page_offset != 0) + ? std::min(col_meta.data_page_offset, col_meta.dictionary_page_offset) + : col_meta.data_page_offset; + + // Map each column chunk to its column index and its source index + chunk_source_map[chunk_count] = row_group_source; + + if (col_meta.codec != Compression::UNCOMPRESSED) { + total_decompressed_size += col_meta.total_uncompressed_size; + } + + chunk_count++; + } + remaining_rows -= row_group_rows; + } + + // Read compressed chunk data to device memory + read_chunk_tasks.push_back(read_column_chunks_async(_sources, + raw_page_data, + chunks, + 0, + chunks.size(), + column_chunk_offsets, + chunk_source_map, + _stream)); + + CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read."); + + return {total_decompressed_size > 0, std::move(read_chunk_tasks)}; +} + +void reader::impl::load_global_chunk_info() +{ + auto const num_rows = _file_itm_data.global_num_rows; + auto const& row_groups_info = _file_itm_data.row_groups; + auto& chunks = _file_itm_data.chunks; + + // Descriptors for all the chunks that make up the selected columns + auto const num_input_columns = _input_columns.size(); + auto const num_chunks = row_groups_info.size() * num_input_columns; + + // Initialize column chunk information + auto remaining_rows = num_rows; + for (auto const& rg : row_groups_info) { + auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); + auto const row_group_start = rg.start_row; + auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); + // generate ColumnChunkDesc objects for everything to be decoded (all input columns) for (size_t i = 0; i < num_input_columns; ++i) { auto col = _input_columns[i]; @@ -768,11 +824,6 @@ std::pair>> reader::impl::create_and_read_co schema.converted_type, schema.type_length); - column_chunk_offsets[chunks.size()] = - (col_meta.dictionary_page_offset != 0) - ? std::min(col_meta.data_page_offset, col_meta.dictionary_page_offset) - : col_meta.data_page_offset; - chunks.push_back(gpu::ColumnChunkDesc(col_meta.total_compressed_size, nullptr, col_meta.num_values, @@ -792,92 +843,171 @@ std::pair>> reader::impl::create_and_read_co clock_rate, i, col.schema_idx)); - - // Map each column chunk to its column index and its source index - chunk_source_map[chunks.size() - 1] = row_group_source; - - if (col_meta.codec != Compression::UNCOMPRESSED) { - total_decompressed_size += col_meta.total_uncompressed_size; - } } + remaining_rows -= row_group_rows; } +} - // Read compressed chunk data to device memory - read_rowgroup_tasks.push_back(read_column_chunks_async(_sources, - raw_page_data, - chunks, - 0, - chunks.size(), - column_chunk_offsets, - chunk_source_map, - _stream)); +void reader::impl::compute_input_pass_row_group_info() +{ + // at this point, row_groups has already been filtered down to just the row groups we need to + // handle optional skip_rows/num_rows parameters. + auto const& row_groups_info = _file_itm_data.row_groups; + + // if the user hasn't specified an input size limit, read everything in a single pass. + if (_input_pass_read_limit == 0) { + _input_pass_row_group_offsets.push_back(0); + _input_pass_row_group_offsets.push_back(row_groups_info.size()); + return; + } - CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read."); + // generate passes. make sure to account for the case where a single row group doesn't fit within + // + std::size_t const read_limit = + _input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits::max(); + std::size_t cur_pass_byte_size = 0; + std::size_t cur_rg_start = 0; + std::size_t cur_row_count = 0; + _input_pass_row_group_offsets.push_back(0); + _input_pass_row_count.push_back(0); + + for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { + auto const& rgi = row_groups_info[cur_rg_index]; + auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); + + // can we add this row group + if (cur_pass_byte_size + row_group.total_byte_size >= read_limit) { + // A single row group (the current one) is larger than the read limit: + // We always need to include at least one row group, so end the pass at the end of the current + // row group + if (cur_rg_start == cur_rg_index) { + _input_pass_row_group_offsets.push_back(cur_rg_index + 1); + _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); + cur_rg_start = cur_rg_index + 1; + cur_pass_byte_size = 0; + } + // End the pass at the end of the previous row group + else { + _input_pass_row_group_offsets.push_back(cur_rg_index); + _input_pass_row_count.push_back(cur_row_count); + cur_rg_start = cur_rg_index; + cur_pass_byte_size = row_group.total_byte_size; + } + } else { + cur_pass_byte_size += row_group.total_byte_size; + } + cur_row_count += row_group.num_rows; + } + // add the last pass if necessary + if (_input_pass_row_group_offsets.back() != row_groups_info.size()) { + _input_pass_row_group_offsets.push_back(row_groups_info.size()); + _input_pass_row_count.push_back(cur_row_count); + } +} - return {total_decompressed_size > 0, std::move(read_rowgroup_tasks)}; +void reader::impl::setup_pass() +{ + // this will also cause the previous pass information to be deleted + _pass_itm_data = std::make_unique(); + + // setup row groups to be loaded for this pass + auto const row_group_start = _input_pass_row_group_offsets[_current_input_pass]; + auto const row_group_end = _input_pass_row_group_offsets[_current_input_pass + 1]; + auto const num_row_groups = row_group_end - row_group_start; + _pass_itm_data->row_groups.resize(num_row_groups); + std::copy(_file_itm_data.row_groups.begin() + row_group_start, + _file_itm_data.row_groups.begin() + row_group_end, + _pass_itm_data->row_groups.begin()); + + auto const num_passes = _input_pass_row_group_offsets.size() - 1; + CUDF_EXPECTS(_current_input_pass < num_passes, "Encountered an invalid read pass index"); + + auto const chunks_per_rowgroup = _input_columns.size(); + auto const num_chunks = chunks_per_rowgroup * num_row_groups; + + auto chunk_start = _file_itm_data.chunks.begin() + (row_group_start * chunks_per_rowgroup); + auto chunk_end = _file_itm_data.chunks.begin() + (row_group_end * chunks_per_rowgroup); + + _pass_itm_data->chunks = + cudf::detail::hostdevice_vector(num_chunks, _stream); + std::copy(chunk_start, chunk_end, _pass_itm_data->chunks.begin()); + + // adjust skip_rows and num_rows by what's available in the row groups we are processing + if (num_passes == 1) { + _pass_itm_data->skip_rows = _file_itm_data.global_skip_rows; + _pass_itm_data->num_rows = _file_itm_data.global_num_rows; + } else { + auto const global_start_row = _file_itm_data.global_skip_rows; + auto const global_end_row = global_start_row + _file_itm_data.global_num_rows; + auto const start_row = std::max(_input_pass_row_count[_current_input_pass], global_start_row); + auto const end_row = std::min(_input_pass_row_count[_current_input_pass + 1], global_end_row); + + // skip_rows is always global in the sense that it is relative to the first row of + // everything we will be reading, regardless of what pass we are on. + // num_rows is how many rows we are reading this pass. + _pass_itm_data->skip_rows = global_start_row + _input_pass_row_count[_current_input_pass]; + _pass_itm_data->num_rows = end_row - start_row; + } } -void reader::impl::load_and_decompress_data( - cudf::host_span const row_groups_info, size_type num_rows) +void reader::impl::load_and_decompress_data() { // This function should never be called if `num_rows == 0`. - CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero."); + CUDF_EXPECTS(_pass_itm_data->num_rows > 0, "Number of reading rows must not be zero."); - auto& raw_page_data = _file_itm_data.raw_page_data; - auto& decomp_page_data = _file_itm_data.decomp_page_data; - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto& raw_page_data = _pass_itm_data->raw_page_data; + auto& decomp_page_data = _pass_itm_data->decomp_page_data; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; - auto const [has_compressed_data, read_rowgroup_tasks] = - create_and_read_column_chunks(row_groups_info, num_rows); + auto const [has_compressed_data, read_chunks_tasks] = read_and_decompress_column_chunks(); - for (auto& task : read_rowgroup_tasks) { + for (auto& task : read_chunks_tasks) { task.wait(); } // Process dataset chunk pages into output columns auto const total_pages = count_page_headers(chunks, _stream); + if (total_pages <= 0) { return; } pages = cudf::detail::hostdevice_vector(total_pages, total_pages, _stream); - if (total_pages > 0) { - // decoding of column/page information - _file_itm_data.level_type_size = decode_page_headers(chunks, pages, _stream); - if (has_compressed_data) { - decomp_page_data = decompress_page_data(chunks, pages, _stream); - // Free compressed data - for (size_t c = 0; c < chunks.size(); c++) { - if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); } - } + // decoding of column/page information + _pass_itm_data->level_type_size = decode_page_headers(chunks, pages, _stream); + if (has_compressed_data) { + decomp_page_data = decompress_page_data(chunks, pages, _stream); + // Free compressed data + for (size_t c = 0; c < chunks.size(); c++) { + if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); } } + } - // build output column info - // walk the schema, building out_buffers that mirror what our final cudf columns will look - // like. important : there is not necessarily a 1:1 mapping between input columns and output - // columns. For example, parquet does not explicitly store a ColumnChunkDesc for struct - // columns. The "structiness" is simply implied by the schema. For example, this schema: - // required group field_id=1 name { - // required binary field_id=2 firstname (String); - // required binary field_id=3 middlename (String); - // required binary field_id=4 lastname (String); - // } - // will only contain 3 columns of data (firstname, middlename, lastname). But of course - // "name" is a struct column that we want to return, so we have to make sure that we - // create it ourselves. - // std::vector output_info = build_output_column_info(); - - // the following two allocate functions modify the page data - pages.device_to_host_sync(_stream); - { - // nesting information (sizes, etc) stored -per page- - // note : even for flat schemas, we allocate 1 level of "nesting" info - allocate_nesting_info(); + // build output column info + // walk the schema, building out_buffers that mirror what our final cudf columns will look + // like. important : there is not necessarily a 1:1 mapping between input columns and output + // columns. For example, parquet does not explicitly store a ColumnChunkDesc for struct + // columns. The "structiness" is simply implied by the schema. For example, this schema: + // required group field_id=1 name { + // required binary field_id=2 firstname (String); + // required binary field_id=3 middlename (String); + // required binary field_id=4 lastname (String); + // } + // will only contain 3 columns of data (firstname, middlename, lastname). But of course + // "name" is a struct column that we want to return, so we have to make sure that we + // create it ourselves. + // std::vector output_info = build_output_column_info(); + + // the following two allocate functions modify the page data + pages.device_to_host_sync(_stream); + { + // nesting information (sizes, etc) stored -per page- + // note : even for flat schemas, we allocate 1 level of "nesting" info + allocate_nesting_info(); - // level decode space - allocate_level_decode_space(); - } - pages.host_to_device_async(_stream); + // level decode space + allocate_level_decode_space(); } + pages.host_to_device_async(_stream); } namespace { @@ -1183,7 +1313,7 @@ std::vector find_splits(std::vector c */ std::vector compute_splits( cudf::detail::hostdevice_vector& pages, - gpu::chunk_intermediate_data const& id, + gpu::pass_intermediate_data const& id, size_t num_rows, size_t chunk_read_limit, rmm::cuda_stream_view stream) @@ -1539,13 +1669,12 @@ struct page_offset_output_iter { } // anonymous namespace -void reader::impl::preprocess_pages(size_t skip_rows, - size_t num_rows, - bool uses_custom_row_bounds, - size_t chunk_read_limit) +void reader::impl::preprocess_pages(bool uses_custom_row_bounds, size_t chunk_read_limit) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto const skip_rows = _pass_itm_data->skip_rows; + auto const num_rows = _pass_itm_data->num_rows; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; // compute page ordering. // @@ -1636,7 +1765,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, // Build index for string dictionaries since they can't be indexed // directly due to variable-sized elements - _chunk_itm_data.str_dict_index = + _pass_itm_data->str_dict_index = cudf::detail::make_zeroed_device_uvector_async( total_str_dict_indexes, _stream, rmm::mr::get_current_device_resource()); @@ -1646,7 +1775,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, "Column/page schema index mismatch"); if (is_dict_chunk(chunks[c])) { - chunks[c].str_dict_index = _chunk_itm_data.str_dict_index.data() + str_ofs; + chunks[c].str_dict_index = _pass_itm_data->str_dict_index.data() + str_ofs; str_ofs += pages[page_count].num_input_values; } @@ -1677,7 +1806,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, std::numeric_limits::max(), true, // compute num_rows chunk_read_limit > 0, // compute string sizes - _file_itm_data.level_type_size, + _pass_itm_data->level_type_size, _stream); // computes: @@ -1699,20 +1828,21 @@ void reader::impl::preprocess_pages(size_t skip_rows, } // preserve page ordering data for string decoder - _chunk_itm_data.page_keys = std::move(page_keys); - _chunk_itm_data.page_index = std::move(page_index); + _pass_itm_data->page_keys = std::move(page_keys); + _pass_itm_data->page_index = std::move(page_index); // compute splits if necessary. otherwise return a single split representing // the whole file. - _chunk_read_info = chunk_read_limit > 0 - ? compute_splits(pages, _chunk_itm_data, num_rows, chunk_read_limit, _stream) - : std::vector{{skip_rows, num_rows}}; + _pass_itm_data->output_chunk_read_info = + _output_chunk_read_limit > 0 + ? compute_splits(pages, *_pass_itm_data, num_rows, chunk_read_limit, _stream) + : std::vector{{skip_rows, num_rows}}; } void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds) { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto const& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to parse"); @@ -1729,7 +1859,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses num_rows, false, // num_rows is already computed false, // no need to compute string sizes - _file_itm_data.level_type_size, + _pass_itm_data->level_type_size, _stream); // print_pages(pages, _stream); @@ -1766,7 +1896,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - auto& page_index = _chunk_itm_data.page_index; + auto& page_index = _pass_itm_data->page_index; std::vector h_cols_info; h_cols_info.reserve(_input_columns.size()); @@ -1846,10 +1976,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses std::vector reader::impl::calculate_page_string_offsets() { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto const& page_keys = _chunk_itm_data.page_keys; - auto const& page_index = _chunk_itm_data.page_index; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto const& page_keys = _pass_itm_data->page_keys; + auto const& page_index = _pass_itm_data->page_index; std::vector col_sizes(_input_columns.size(), 0L); rmm::device_uvector d_col_sizes(col_sizes.size(), _stream); diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp index 9815304b965..05fb9a3ec48 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cpp +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -100,11 +100,13 @@ auto write_file(std::vector>& input_columns, return std::pair{std::move(input_table), std::move(filepath)}; } -auto chunked_read(std::string const& filepath, std::size_t byte_limit) +auto chunked_read(std::string const& filepath, + std::size_t output_limit, + std::size_t input_limit = 0) { auto const read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); - auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts); + auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts); auto num_chunks = 0; auto out_tables = std::vector>{}; @@ -950,3 +952,65 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadNullCount) EXPECT_EQ(reader.read_chunk().tbl->get_column(0).null_count(), page_limit_rows / 4); } while (reader.has_next()); } + +TEST_F(ParquetChunkedReaderTest, InputLimitSimple) +{ + auto const filepath = temp_env->get_temp_filepath("input_limit_10_rowgroups.parquet"); + + // This results in 10 grow groups, at 4001150 bytes per row group + constexpr int num_rows = 25'000'000; + auto value_iter = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i; }); + cudf::test::fixed_width_column_wrapper expected(value_iter, value_iter + num_rows); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, + cudf::table_view{{expected}}) + // note: it is unnecessary to force compression to NONE here because the size we are using in + // the row group is the uncompressed data size. But forcing the dictionary policy to + // dictionary_policy::NEVER is necessary to prevent changes in the + // decompressed-but-not-yet-decoded data. + .dictionary_policy(cudf::io::dictionary_policy::NEVER); + + cudf::io::write_parquet(opts); + + { + // no chunking + auto const [result, num_chunks] = chunked_read(filepath, 0, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 1); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 4000000); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 4100000); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 12 chunks of 200k rows each, plus 1 final chunk of 100k rows. + auto const [result, num_chunks] = chunked_read(filepath, 0, 8002301); + EXPECT_EQ(num_chunks, 13); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 1 big chunk + auto const [result, num_chunks] = chunked_read(filepath, 0, size_t{1} * 1024 * 1024 * 1024); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } +}