diff --git a/cpp/benchmarks/io/nvbench_helpers.hpp b/cpp/benchmarks/io/nvbench_helpers.hpp index 1e3ab2b7b4f..cc548ccd3de 100644 --- a/cpp/benchmarks/io/nvbench_helpers.hpp +++ b/cpp/benchmarks/io/nvbench_helpers.hpp @@ -28,6 +28,7 @@ enum class data_type : int32_t { INTEGRAL = static_cast(type_group_id::INTEGRAL), INTEGRAL_SIGNED = static_cast(type_group_id::INTEGRAL_SIGNED), FLOAT = static_cast(type_group_id::FLOATING_POINT), + BOOL8 = static_cast(cudf::type_id::BOOL8), DECIMAL = static_cast(type_group_id::FIXED_POINT), TIMESTAMP = static_cast(type_group_id::TIMESTAMP), DURATION = static_cast(type_group_id::DURATION), @@ -44,6 +45,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS( case data_type::INTEGRAL: return "INTEGRAL"; case data_type::INTEGRAL_SIGNED: return "INTEGRAL_SIGNED"; case data_type::FLOAT: return "FLOAT"; + case data_type::BOOL8: return "BOOL8"; case data_type::DECIMAL: return "DECIMAL"; case data_type::TIMESTAMP: return "TIMESTAMP"; case data_type::DURATION: return "DURATION"; diff --git a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp index ce115fd7723..b14f9cbb67e 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp @@ -114,6 +114,7 @@ void BM_parquet_read_io_compression(nvbench::state& state) { auto const d_type = get_type_or_group({static_cast(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -298,6 +299,7 @@ void BM_parquet_read_wide_tables_mixed(nvbench::state& state) using d_type_list = nvbench::enum_type_list(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), diff --git a/cpp/benchmarks/io/parquet/parquet_writer.cpp b/cpp/benchmarks/io/parquet/parquet_writer.cpp index 256e50f0e64..84e4b8b93c0 100644 --- a/cpp/benchmarks/io/parquet/parquet_writer.cpp +++ b/cpp/benchmarks/io/parquet/parquet_writer.cpp @@ -89,6 +89,7 @@ void BM_parq_write_io_compression( { auto const data_types = get_type_or_group({static_cast(data_type::INTEGRAL), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -143,6 +144,7 @@ void BM_parq_write_varying_options( auto const data_types = get_type_or_group({static_cast(data_type::INTEGRAL_SIGNED), static_cast(data_type::FLOAT), + static_cast(data_type::BOOL8), static_cast(data_type::DECIMAL), static_cast(data_type::TIMESTAMP), static_cast(data_type::DURATION), @@ -181,6 +183,7 @@ void BM_parq_write_varying_options( using d_type_list = nvbench::enum_type_list(dst)); } else if (dtype == INT96) { gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); } else if (dtype_len == 8) { @@ -841,6 +843,33 @@ __device__ inline bool maybe_has_nulls(page_state_s* s) return run_val != s->col.max_level[lvl]; } +template +inline __device__ void bool_plain_decode(page_state_s* s, state_buf* sb, int t, int to_decode) +{ + int pos = s->dict_pos; + int const target_pos = pos + to_decode; + + while (pos < target_pos) { + int const batch_len = min(target_pos - pos, decode_block_size_t); + + if (t < batch_len) { + int const bit_pos = pos + t; + int const byte_offset = bit_pos >> 3; + int const bit_in_byte_index = bit_pos & 7; + + uint8_t const* const read_from = s->data_start + byte_offset; + bool const read_bit = (*read_from) & (1 << bit_in_byte_index); + + int const write_to_index = rolling_index(bit_pos); + sb->dict_idx[write_to_index] = read_bit; + } + + pos += batch_len; + } + + if (t == 0) { s->dict_pos = pos; } +} + template __device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t) { @@ -872,14 +901,7 @@ __device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t) * @param num_rows Maximum number of rows to read * @param error_code Error code to set if an error is encountered */ -template - typename DecodeValuesFunc> +template CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) gpuDecodePageDataGeneric(PageInfo* pages, device_span chunks, @@ -887,12 +909,33 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) size_t num_rows, kernel_error::pointer error_code) { + constexpr bool has_dict_t = (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST); + constexpr bool has_bools_t = (kernel_mask_t == decode_kernel_mask::BOOLEAN) || + (kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) || + (kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST); + constexpr bool has_nesting_t = + (kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED); + constexpr bool has_lists_t = + (kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST) || + (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); + constexpr bool split_decode_t = + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) || + (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); + constexpr int rolling_buf_size = decode_block_size_t * 2; constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); __shared__ __align__(16) page_state_s state_g; using state_buf_t = page_state_buffers_s; __shared__ __align__(16) state_buf_t state_buffers; @@ -920,32 +963,31 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. if (s->num_rows == 0) { return; } - DecodeValuesFunc decode_values; + using value_decoder_type = std::conditional_t< + split_decode_t, + decode_fixed_width_split_values_func, + decode_fixed_width_values_func>; + value_decoder_type decode_values; bool const should_process_nulls = is_nullable(s) && maybe_has_nulls(s); // shared buffer. all shared memory is suballocated out of here - constexpr int shared_rep_size = - has_lists_t - ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) - : 0; - constexpr int shared_dict_size = - has_dict_t - ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) - : 0; - constexpr int shared_def_size = - cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); - constexpr int shared_buf_size = shared_rep_size + shared_dict_size + shared_def_size; + constexpr int rle_run_buffer_bytes = + cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); + constexpr int shared_buf_size = + rle_run_buffer_bytes * (static_cast(has_dict_t) + static_cast(has_bools_t) + + static_cast(has_lists_t) + 1); __shared__ __align__(16) uint8_t shared_buf[shared_buf_size]; // setup all shared memory buffers - int shared_offset = 0; - rle_run* rep_runs = reinterpret_cast*>(shared_buf + shared_offset); - if constexpr (has_lists_t) { shared_offset += shared_rep_size; } - - rle_run* dict_runs = reinterpret_cast*>(shared_buf + shared_offset); - if constexpr (has_dict_t) { shared_offset += shared_dict_size; } - rle_run* def_runs = reinterpret_cast*>(shared_buf + shared_offset); + int shared_offset = 0; + auto rep_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_lists_t) { shared_offset += rle_run_buffer_bytes; } + auto dict_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_dict_t) { shared_offset += rle_run_buffer_bytes; } + auto bool_runs = reinterpret_cast(shared_buf + shared_offset); + if constexpr (has_bools_t) { shared_offset += rle_run_buffer_bytes; } + auto def_runs = reinterpret_cast(shared_buf + shared_offset); // initialize the stream decoders (requires values computed in setupLocalPageInfo) rle_stream def_decoder{def_runs}; @@ -974,6 +1016,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); } + // Use dictionary stream memory for bools + rle_stream bool_stream{bool_runs}; + bool bools_are_rle_stream = (s->dict_run == 0); + if constexpr (has_bools_t) { + if (bools_are_rle_stream) { + bool_stream.init(1, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); + } + } + __syncthreads(); + // We use two counters in the loop below: processed_count and valid_count. // - processed_count: number of values out of num_input_values that we have decoded so far. // the definition stream returns the number of total rows it has processed in each call @@ -1041,13 +1093,20 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) } __syncthreads(); - // if we have dictionary data + // if we have dictionary or bool data + // We want to limit the number of dictionary/bool items we decode, that correspond to + // the rows we have processed in this iteration that are valid. + // We know the number of valid rows to process with: next_valid_count - valid_count. if constexpr (has_dict_t) { - // We want to limit the number of dictionary items we decode, that correspond to - // the rows we have processed in this iteration that are valid. - // We know the number of valid rows to process with: next_valid_count - valid_count. dict_stream.decode_next(t, next_valid_count - valid_count); __syncthreads(); + } else if constexpr (has_bools_t) { + if (bools_are_rle_stream) { + bool_stream.decode_next(t, next_valid_count - valid_count); + } else { + bool_plain_decode(s, sb, t, next_valid_count - valid_count); + } + __syncthreads(); } // decode the values themselves @@ -1061,250 +1120,82 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) } // anonymous namespace -void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) -{ - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); - dim3 dim_grid(pages.size(), 1); // 1 threadblock per page - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } -} +template +using kernel_tag_t = std::integral_constant; -void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) -{ - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } -} +template +using int_tag_t = std::integral_constant; -void __host__ -DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) +void __host__ DecodePageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + decode_kernel_mask kernel_mask, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) { - constexpr int decode_block_size = 128; - - dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks - - if (level_type_size == 1) { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } - } else { - if (is_list) { - gpuDecodePageDataGeneric - <<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); - } else if (has_nesting) { - gpuDecodePageDataGeneric + // No template parameters on lambdas until C++20, so use type tags instead + auto launch_kernel = [&](auto block_size_tag, auto kernel_mask_tag) { + constexpr int decode_block_size = decltype(block_size_tag)::value; + constexpr decode_kernel_mask mask = decltype(kernel_mask_tag)::value; + + dim3 dim_block(decode_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodePageDataGeneric + gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } + }; + + switch (kernel_mask) { + case decode_kernel_mask::FIXED_WIDTH_NO_DICT: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT_NESTED: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::FIXED_WIDTH_DICT_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST: + launch_kernel(int_tag_t<128>{}, + kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN_NESTED: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + case decode_kernel_mask::BOOLEAN_LIST: + launch_kernel(int_tag_t<128>{}, kernel_tag_t{}); + break; + default: CUDF_EXPECTS(false, "Kernel type not handled by this function"); break; } } diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index 62f1ee88036..5b9831668e6 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -343,8 +343,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) bool has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - __shared__ rle_run rep_runs[rle_run_buffer_size]; + __shared__ rle_run def_runs[rle_run_buffer_size]; + __shared__ rle_run rep_runs[rle_run_buffer_size]; rle_stream decoders[level_type::NUM_LEVEL_TYPES] = {{def_runs}, {rep_runs}}; diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 52d53cb8225..a8a8c441a84 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -181,9 +181,13 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (is_string_col(chunk)) { // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; + } else if (is_boolean(chunk)) { + return is_list(chunk) ? decode_kernel_mask::BOOLEAN_LIST + : is_nested(chunk) ? decode_kernel_mask::BOOLEAN_NESTED + : decode_kernel_mask::BOOLEAN; } - if (!is_byte_array(chunk) && !is_boolean(chunk)) { + if (!is_byte_array(chunk)) { if (page.encoding == Encoding::PLAIN) { return is_list(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST : is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index ca74a1c2ba0..5ece3a54892 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -618,8 +618,8 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - __shared__ rle_run rep_runs[rle_run_buffer_size]; + __shared__ rle_run def_runs[rle_run_buffer_size]; + __shared__ rle_run rep_runs[rle_run_buffer_size]; rle_stream decoders[level_type::NUM_LEVEL_TYPES] = {{def_runs}, {rep_runs}}; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index dba24b553e6..3b4d0e6dc80 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -224,6 +224,9 @@ enum class decode_kernel_mask { FIXED_WIDTH_NO_DICT_LIST = (1 << 13), // Run decode kernel for fixed width non-dictionary pages BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST = (1 << 14), // Run decode kernel for BYTE_STREAM_SPLIT encoded data for fixed width lists + BOOLEAN = (1 << 15), // Run decode kernel for boolean data + BOOLEAN_NESTED = (1 << 16), // Run decode kernel for nested boolean data + BOOLEAN_LIST = (1 << 17), // Run decode kernel for list boolean data }; // mask representing all the ways in which a string can be encoded @@ -539,7 +542,7 @@ enum class encode_kernel_mask { DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel - BYTE_STREAM_SPLIT = (1 << 5), // Run plain encoding kernel, but split streams + BYTE_STREAM_SPLIT = (1 << 5) // Run plain encoding kernel, but split streams }; /** @@ -911,72 +914,18 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. + * @param[in] kernel_mask Mask indicating the type of decoding kernel to launch. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ -void DecodePageDataFixed(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); - -/** - * @brief Launches kernel for reading dictionary fixed width column data stored in the pages - * - * The page data will be written to the output pointed to in the page's - * associated column chunk. - * - * @param[in,out] pages All pages to be decoded - * @param[in] chunks All chunks to be decoded - * @param[in] num_rows Total number of rows to read - * @param[in] min_row Minimum number of rows to read - * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. - * @param[out] error_code Error code for kernel failures - * @param[in] stream CUDA stream to use - */ -void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); - -/** - * @brief Launches kernel for reading fixed width column data stored in the pages - * - * The page data will be written to the output pointed to in the page's - * associated column chunk. - * - * @param[in,out] pages All pages to be decoded - * @param[in] chunks All chunks to be decoded - * @param[in] num_rows Total number of rows to read - * @param[in] min_row Minimum number of rows to read - * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] has_nesting Whether or not the data contains nested (but not list) data. - * @param[in] is_list Whether or not the data contains list data. - * @param[out] error_code Error code for kernel failures - * @param[in] stream CUDA stream to use - */ -void DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - bool has_nesting, - bool is_list, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); +void DecodePageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + decode_kernel_mask kernel_mask, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing encoder row group fragments diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 689386b8957..cfbb88cd80e 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -219,8 +219,20 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); - // launch string decoder int s_idx = 0; + + auto decode_data = [&](decode_kernel_mask decoder_mask) { + DecodePageData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + decoder_mask, + error_code.data(), + streams[s_idx++]); + }; + + // launch string decoder if (BitAnd(kernel_mask, decode_kernel_mask::STRING) != 0) { DecodeStringPageData(subpass.pages, pass.chunks, @@ -266,41 +278,17 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // launch byte stream split decoder if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT); } // launch byte stream split decoder, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED); } // launch byte stream split decoder, for list columns if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST) != 0) { - DecodeSplitPageFixedWidthData(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST); } // launch byte stream split decoder @@ -316,80 +304,47 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num // launch fixed width type decoder if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT); } // launch fixed width type decoder for lists if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST); } // launch fixed width type decoder, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) != 0) { - DecodePageDataFixed(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED); + } + + // launch boolean type decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN) != 0) { + decode_data(decode_kernel_mask::BOOLEAN); + } + + // launch boolean type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN_NESTED) != 0) { + decode_data(decode_kernel_mask::BOOLEAN_NESTED); + } + + // launch boolean type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BOOLEAN_LIST) != 0) { + decode_data(decode_kernel_mask::BOOLEAN_LIST); } // launch fixed width type decoder with dictionaries if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - false, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT); } // launch fixed width type decoder with dictionaries for lists if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_LIST) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - true, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT_LIST); } // launch fixed width type decoder with dictionaries, for nested columns if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) != 0) { - DecodePageDataFixedDict(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - true, - false, - error_code.data(), - streams[s_idx++]); + decode_data(decode_kernel_mask::FIXED_WIDTH_DICT_NESTED); } // launch the catch-all page decoder diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 69e783a89d0..3c49de0c997 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -152,7 +152,6 @@ __device__ inline void decode(level_t* const output, } // a single rle run. may be broken up into multiple rle_batches -template struct rle_run { int size; // total size of the run int output_pos; // absolute position of this run w.r.t output @@ -183,14 +182,14 @@ struct rle_stream { level_t* output; - rle_run* runs; + rle_run* runs; int output_pos; int fill_index; int decode_index; - __device__ rle_stream(rle_run* _runs) : runs(_runs) {} + __device__ rle_stream(rle_run* _runs) : runs(_runs) {} __device__ inline bool is_last_decode_warp(int warp_id) { @@ -217,7 +216,7 @@ struct rle_stream { decode_index = -1; // signals the first iteration. Nothing to decode. } - __device__ inline int get_rle_run_info(rle_run& run) + __device__ inline int get_rle_run_info(rle_run& run) { run.start = cur; run.level_run = get_vlq32(run.start, end); @@ -384,7 +383,7 @@ struct rle_stream { // started basically we're setting up the rle_stream vars necessary to start fill_run_batch for // the first time while (cur < end) { - rle_run run; + rle_run run; int run_bytes = get_rle_run_info(run); if ((output_pos + run.size) > target_count) {