From ddfb2848d6b7bb3cd03b8377f349f401030f558c Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Tue, 29 Oct 2024 09:51:19 -0700 Subject: [PATCH 1/7] Support storing `precision` of decimal types in `Schema` class (#17176) In Spark, the `DecimalType` has a specific number of digits to represent the numbers. However, when creating a data Schema, only type and name of the column are stored, thus we lose that precision information. As such, it would be difficult to reconstruct the original decimal types from cudf's `Schema` instance. This PR adds a `precision` member variable to the `Schema` class in cudf Java, allowing it to store the precision number of the original decimal column. Partially contributes to https://github.com/NVIDIA/spark-rapids/issues/11560. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/17176 --- java/src/main/java/ai/rapids/cudf/Schema.java | 77 +++++++++++++++++-- 1 file changed, 70 insertions(+), 7 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Schema.java b/java/src/main/java/ai/rapids/cudf/Schema.java index 76b2799aad6..6da591d659f 100644 --- a/java/src/main/java/ai/rapids/cudf/Schema.java +++ b/java/src/main/java/ai/rapids/cudf/Schema.java @@ -29,26 +29,52 @@ public class Schema { public static final Schema INFERRED = new Schema(); private final DType topLevelType; + + /** + * Default value for precision value, when it is not specified or the column type is not decimal. + */ + private static final int UNKNOWN_PRECISION = -1; + + /** + * Store precision for the top level column, only applicable if the column is a decimal type. + *

+ * This variable is not designed to be used by any libcudf's APIs since libcudf does not support + * precisions for fixed point numbers. + * Instead, it is used only to pass down the precision values from Spark's DecimalType to the + * JNI level, where some JNI functions require these values to perform their operations. + */ + private final int topLevelPrecision; + private final List childNames; private final List childSchemas; private boolean flattened = false; private String[] flattenedNames; private DType[] flattenedTypes; + private int[] flattenedPrecisions; private int[] flattenedCounts; private Schema(DType topLevelType, + int topLevelPrecision, List childNames, List childSchemas) { this.topLevelType = topLevelType; + this.topLevelPrecision = topLevelPrecision; this.childNames = childNames; this.childSchemas = childSchemas; } + private Schema(DType topLevelType, + List childNames, + List childSchemas) { + this(topLevelType, UNKNOWN_PRECISION, childNames, childSchemas); + } + /** * Inferred schema. */ private Schema() { topLevelType = null; + topLevelPrecision = UNKNOWN_PRECISION; childNames = null; childSchemas = null; } @@ -104,14 +130,17 @@ private void flattenIfNeeded() { if (flatLen == 0) { flattenedNames = null; flattenedTypes = null; + flattenedPrecisions = null; flattenedCounts = null; } else { String[] names = new String[flatLen]; DType[] types = new DType[flatLen]; + int[] precisions = new int[flatLen]; int[] counts = new int[flatLen]; - collectFlattened(names, types, counts, 0); + collectFlattened(names, types, precisions, counts, 0); flattenedNames = names; flattenedTypes = types; + flattenedPrecisions = precisions; flattenedCounts = counts; } flattened = true; @@ -128,19 +157,20 @@ private int flattenedLength(int startingLength) { return startingLength; } - private int collectFlattened(String[] names, DType[] types, int[] counts, int offset) { + private int collectFlattened(String[] names, DType[] types, int[] precisions, int[] counts, int offset) { if (childSchemas != null) { for (int i = 0; i < childSchemas.size(); i++) { Schema child = childSchemas.get(i); names[offset] = childNames.get(i); types[offset] = child.topLevelType; + precisions[offset] = child.topLevelPrecision; if (child.childNames != null) { counts[offset] = child.childNames.size(); } else { counts[offset] = 0; } offset++; - offset = this.childSchemas.get(i).collectFlattened(names, types, counts, offset); + offset = this.childSchemas.get(i).collectFlattened(names, types, precisions, counts, offset); } } return offset; @@ -226,6 +256,22 @@ public int[] getFlattenedTypeScales() { return ret; } + /** + * Get decimal precisions of the columns' types flattened from all levels in schema by + * depth-first traversal. + *

+ * This is used to pass down the decimal precisions from Spark to only the JNI layer, where + * some JNI functions require precision values to perform their operations. + * Decimal precisions should not be consumed by any libcudf's APIs since libcudf does not + * support precisions for fixed point numbers. + * + * @return An array containing decimal precision of all columns in schema. + */ + public int[] getFlattenedDecimalPrecisions() { + flattenIfNeeded(); + return flattenedPrecisions; + } + /** * Get the types of the columns in schema flattened from all levels by depth-first traversal. * @return An array containing types of all columns in schema. @@ -307,11 +353,13 @@ public HostColumnVector.DataType asHostDataType() { public static class Builder { private final DType topLevelType; + private final int topLevelPrecision; private final List names; private final List types; - private Builder(DType topLevelType) { + private Builder(DType topLevelType, int topLevelPrecision) { this.topLevelType = topLevelType; + this.topLevelPrecision = topLevelPrecision; if (topLevelType == DType.STRUCT || topLevelType == DType.LIST) { // There can be children names = new ArrayList<>(); @@ -322,14 +370,19 @@ private Builder(DType topLevelType) { } } + private Builder(DType topLevelType) { + this(topLevelType, UNKNOWN_PRECISION); + } + /** * Add a new column * @param type the type of column to add * @param name the name of the column to add (Ignored for list types) + * @param precision the decimal precision, only applicable for decimal types * @return the builder for the new column. This should really only be used when the type * passed in is a LIST or a STRUCT. */ - public Builder addColumn(DType type, String name) { + public Builder addColumn(DType type, String name, int precision) { if (names == null) { throw new IllegalStateException("A column of type " + topLevelType + " cannot have children"); @@ -340,21 +393,31 @@ public Builder addColumn(DType type, String name) { if (names.contains(name)) { throw new IllegalStateException("Cannot add duplicate names to a schema"); } - Builder ret = new Builder(type); + Builder ret = new Builder(type, precision); types.add(ret); names.add(name); return ret; } + public Builder addColumn(DType type, String name) { + return addColumn(type, name, UNKNOWN_PRECISION); + } + /** * Adds a single column to the current schema. addColumn is preferred as it can be used * to support nested types. * @param type the type of the column. * @param name the name of the column. + * @param precision the decimal precision, only applicable for decimal types. * @return this for chaining. */ + public Builder column(DType type, String name, int precision) { + addColumn(type, name, precision); + return this; + } + public Builder column(DType type, String name) { - addColumn(type, name); + addColumn(type, name, UNKNOWN_PRECISION); return this; } From 63b773e73a9f582e2cfa75ae04bcad8608e8f03a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 29 Oct 2024 13:25:55 -0500 Subject: [PATCH 2/7] Add in new java API for raw host memory allocation (#17197) This is the first patch in a series of patches that should make it so that all java host memory allocations go through the DefaultHostMemoryAllocator unless another allocator is explicitly provided. This is to make it simpler to track/control host memory usage. Authors: - Robert (Bobby) Evans (https://github.com/revans2) Approvers: - Jason Lowe (https://github.com/jlowe) - Alessandro Bellina (https://github.com/abellina) URL: https://github.com/rapidsai/cudf/pull/17197 --- .../main/java/ai/rapids/cudf/HostMemoryBuffer.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java index e4106574a19..d792459901c 100644 --- a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -155,6 +155,16 @@ public static HostMemoryBuffer allocate(long bytes) { return allocate(bytes, defaultPreferPinned); } + /** + * Allocate host memory bypassing the default allocator. This is intended to only be used by other allocators. + * Pinned memory will not be used for these allocations. + * @param bytes size in bytes to allocate + * @return the newly created buffer + */ + public static HostMemoryBuffer allocateRaw(long bytes) { + return new HostMemoryBuffer(UnsafeMemoryAccessor.allocate(bytes), bytes); + } + /** * Create a host buffer that is memory-mapped to a file. * @param path path to the file to map into host memory From 52d7e638af366a2384868c41a7ece889d7ada30e Mon Sep 17 00:00:00 2001 From: Basit Ayantunde Date: Tue, 29 Oct 2024 19:59:13 +0000 Subject: [PATCH 3/7] Unified binary_ops and ast benchmarks parameter names (#17200) This merge request unifies the parameter names of the AST and BINARYOP benchmark suites and makes it easier to perform parameter sweeps and compare the outputs of both benchmarks. Authors: - Basit Ayantunde (https://github.com/lamarrr) Approvers: - Nghia Truong (https://github.com/ttnghia) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/17200 --- cpp/benchmarks/ast/transform.cpp | 26 +++++++++---------- cpp/benchmarks/binaryop/binaryop.cpp | 26 +++++++++---------- cpp/benchmarks/binaryop/compiled_binaryop.cpp | 12 ++++----- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/cpp/benchmarks/ast/transform.cpp b/cpp/benchmarks/ast/transform.cpp index 7fe61054a26..2533ea9611c 100644 --- a/cpp/benchmarks/ast/transform.cpp +++ b/cpp/benchmarks/ast/transform.cpp @@ -52,14 +52,14 @@ enum class TreeType { template static void BM_ast_transform(nvbench::state& state) { - auto const table_size = static_cast(state.get_int64("table_size")); + auto const num_rows = static_cast(state.get_int64("num_rows")); auto const tree_levels = static_cast(state.get_int64("tree_levels")); // Create table data auto const n_cols = reuse_columns ? 1 : tree_levels + 1; auto const source_table = create_sequence_table(cycle_dtypes({cudf::type_to_id()}, n_cols), - row_count{table_size}, + row_count{num_rows}, Nullable ? std::optional{0.5} : std::nullopt); auto table = source_table->view(); @@ -99,8 +99,8 @@ static void BM_ast_transform(nvbench::state& state) auto const& expression_tree_root = expressions.back(); // Use the number of bytes read from global memory - state.add_global_memory_reads(static_cast(table_size) * (tree_levels + 1)); - state.add_global_memory_writes(table_size); + state.add_global_memory_reads(static_cast(num_rows) * (tree_levels + 1)); + state.add_global_memory_writes(num_rows); state.exec(nvbench::exec_tag::sync, [&](nvbench::launch&) { cudf::compute_column(table, expression_tree_root); }); @@ -109,15 +109,15 @@ static void BM_ast_transform(nvbench::state& state) template static void BM_string_compare_ast_transform(nvbench::state& state) { - auto const string_width = static_cast(state.get_int64("string_width")); - auto const num_rows = static_cast(state.get_int64("num_rows")); - auto const num_comparisons = static_cast(state.get_int64("num_comparisons")); - auto const hit_rate = static_cast(state.get_int64("hit_rate")); + auto const string_width = static_cast(state.get_int64("string_width")); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const tree_levels = static_cast(state.get_int64("tree_levels")); + auto const hit_rate = static_cast(state.get_int64("hit_rate")); - CUDF_EXPECTS(num_comparisons > 0, "benchmarks require 1 or more comparisons"); + CUDF_EXPECTS(tree_levels > 0, "benchmarks require 1 or more comparisons"); // Create table data - auto const num_cols = num_comparisons * 2; + auto const num_cols = tree_levels * 2; std::vector> columns; std::for_each( thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_cols), [&](size_t) { @@ -150,7 +150,7 @@ static void BM_string_compare_ast_transform(nvbench::state& state) expressions.emplace_back(cudf::ast::operation(cmp_op, column_refs[0], column_refs[1])); std::for_each(thrust::make_counting_iterator(1), - thrust::make_counting_iterator(num_comparisons), + thrust::make_counting_iterator(tree_levels), [&](size_t idx) { auto const& lhs = expressions.back(); auto const& rhs = expressions.emplace_back( @@ -177,7 +177,7 @@ static void BM_string_compare_ast_transform(nvbench::state& state) NVBENCH_BENCH(name) \ .set_name(#name) \ .add_int64_axis("tree_levels", {1, 5, 10}) \ - .add_int64_axis("table_size", {100'000, 1'000'000, 10'000'000, 100'000'000}) + .add_int64_axis("num_rows", {100'000, 1'000'000, 10'000'000, 100'000'000}) AST_TRANSFORM_BENCHMARK_DEFINE( ast_int32_imbalanced_unique, int32_t, TreeType::IMBALANCED_LEFT, false, false); @@ -202,7 +202,7 @@ AST_TRANSFORM_BENCHMARK_DEFINE( .set_name(#name) \ .add_int64_axis("string_width", {32, 64, 128, 256}) \ .add_int64_axis("num_rows", {32768, 262144, 2097152}) \ - .add_int64_axis("num_comparisons", {1, 2, 3, 4}) \ + .add_int64_axis("tree_levels", {1, 2, 3, 4}) \ .add_int64_axis("hit_rate", {50, 100}) AST_STRING_COMPARE_TRANSFORM_BENCHMARK_DEFINE(ast_string_equal_logical_and, diff --git a/cpp/benchmarks/binaryop/binaryop.cpp b/cpp/benchmarks/binaryop/binaryop.cpp index 35e41c6c2a4..75c91d270a7 100644 --- a/cpp/benchmarks/binaryop/binaryop.cpp +++ b/cpp/benchmarks/binaryop/binaryop.cpp @@ -40,18 +40,18 @@ enum class TreeType { template static void BM_binaryop_transform(nvbench::state& state) { - auto const table_size{static_cast(state.get_int64("table_size"))}; + auto const num_rows{static_cast(state.get_int64("num_rows"))}; auto const tree_levels{static_cast(state.get_int64("tree_levels"))}; // Create table data auto const n_cols = reuse_columns ? 1 : tree_levels + 1; auto const source_table = create_sequence_table( - cycle_dtypes({cudf::type_to_id()}, n_cols), row_count{table_size}); + cycle_dtypes({cudf::type_to_id()}, n_cols), row_count{num_rows}); cudf::table_view table{*source_table}; // Use the number of bytes read from global memory - state.add_global_memory_reads(static_cast(table_size) * (tree_levels + 1)); - state.add_global_memory_writes(table_size); + state.add_global_memory_reads(static_cast(num_rows) * (tree_levels + 1)); + state.add_global_memory_writes(num_rows); state.exec(nvbench::exec_tag::sync, [&](nvbench::launch&) { // Execute tree that chains additions like (((a + b) + c) + d) @@ -74,15 +74,15 @@ static void BM_binaryop_transform(nvbench::state& state) template static void BM_string_compare_binaryop_transform(nvbench::state& state) { - auto const string_width = static_cast(state.get_int64("string_width")); - auto const num_rows = static_cast(state.get_int64("num_rows")); - auto const num_comparisons = static_cast(state.get_int64("num_comparisons")); - auto const hit_rate = static_cast(state.get_int64("hit_rate")); + auto const string_width = static_cast(state.get_int64("string_width")); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const tree_levels = static_cast(state.get_int64("tree_levels")); + auto const hit_rate = static_cast(state.get_int64("hit_rate")); - CUDF_EXPECTS(num_comparisons > 0, "benchmarks require 1 or more comparisons"); + CUDF_EXPECTS(tree_levels > 0, "benchmarks require 1 or more comparisons"); // Create table data - auto const num_cols = num_comparisons * 2; + auto const num_cols = tree_levels * 2; std::vector> columns; std::for_each( thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_cols), [&](size_t) { @@ -113,7 +113,7 @@ static void BM_string_compare_binaryop_transform(nvbench::state& state) cudf::binary_operation(table.get_column(0), table.get_column(1), cmp_op, bool_type, stream); std::for_each( thrust::make_counting_iterator(1), - thrust::make_counting_iterator(num_comparisons), + thrust::make_counting_iterator(tree_levels), [&](size_t idx) { std::unique_ptr comparison = cudf::binary_operation( table.get_column(idx * 2), table.get_column(idx * 2 + 1), cmp_op, bool_type, stream); @@ -133,7 +133,7 @@ static void BM_string_compare_binaryop_transform(nvbench::state& state) } \ NVBENCH_BENCH(name) \ .add_int64_axis("tree_levels", {1, 2, 5, 10}) \ - .add_int64_axis("table_size", {100'000, 1'000'000, 10'000'000, 100'000'000}) + .add_int64_axis("num_rows", {100'000, 1'000'000, 10'000'000, 100'000'000}) BINARYOP_TRANSFORM_BENCHMARK_DEFINE(binaryop_int32_imbalanced_unique, int32_t, @@ -158,7 +158,7 @@ BINARYOP_TRANSFORM_BENCHMARK_DEFINE(binaryop_double_imbalanced_unique, .set_name(#name) \ .add_int64_axis("string_width", {32, 64, 128, 256}) \ .add_int64_axis("num_rows", {32768, 262144, 2097152}) \ - .add_int64_axis("num_comparisons", {1, 2, 3, 4}) \ + .add_int64_axis("tree_levels", {1, 2, 3, 4}) \ .add_int64_axis("hit_rate", {50, 100}) STRING_COMPARE_BINARYOP_TRANSFORM_BENCHMARK_DEFINE(string_compare_binaryop_transform, diff --git a/cpp/benchmarks/binaryop/compiled_binaryop.cpp b/cpp/benchmarks/binaryop/compiled_binaryop.cpp index cd3c3871a2e..426f44a4fa1 100644 --- a/cpp/benchmarks/binaryop/compiled_binaryop.cpp +++ b/cpp/benchmarks/binaryop/compiled_binaryop.cpp @@ -23,10 +23,10 @@ template void BM_compiled_binaryop(nvbench::state& state, cudf::binary_operator binop) { - auto const table_size = static_cast(state.get_int64("table_size")); + auto const num_rows = static_cast(state.get_int64("num_rows")); auto const source_table = create_random_table( - {cudf::type_to_id(), cudf::type_to_id()}, row_count{table_size}); + {cudf::type_to_id(), cudf::type_to_id()}, row_count{num_rows}); auto lhs = cudf::column_view(source_table->get_column(0)); auto rhs = cudf::column_view(source_table->get_column(1)); @@ -37,9 +37,9 @@ void BM_compiled_binaryop(nvbench::state& state, cudf::binary_operator binop) cudf::binary_operation(lhs, rhs, binop, output_dtype); // use number of bytes read and written to global memory - state.add_global_memory_reads(table_size); - state.add_global_memory_reads(table_size); - state.add_global_memory_writes(table_size); + state.add_global_memory_reads(num_rows); + state.add_global_memory_reads(num_rows); + state.add_global_memory_writes(num_rows); state.exec(nvbench::exec_tag::sync, [&](nvbench::launch&) { cudf::binary_operation(lhs, rhs, binop, output_dtype); }); @@ -55,7 +55,7 @@ void BM_compiled_binaryop(nvbench::state& state, cudf::binary_operator binop) } \ NVBENCH_BENCH(name) \ .set_name("compiled_binary_op_" BM_STRINGIFY(name)) \ - .add_int64_axis("table_size", {10'000, 100'000, 1'000'000, 10'000'000, 100'000'000}) + .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000, 100'000'000}) #define build_name(a, b, c, d) a##_##b##_##c##_##d From 8d7b0d8bf0aebebde0a5036d2e51f5991ecbe63b Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:31:27 -0400 Subject: [PATCH 4/7] [BUG] Replace `repo_token` with `github_token` in Auto Assign PR GHA (#17203) The Auto Assign GHA workflow fails with this [error](https://github.com/rapidsai/cudf/actions/runs/11580081781). This PR fixes this error. xref #16969 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/17203 --- .github/workflows/auto-assign.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/auto-assign.yml b/.github/workflows/auto-assign.yml index 673bebd4ecc..1bf4ac08b69 100644 --- a/.github/workflows/auto-assign.yml +++ b/.github/workflows/auto-assign.yml @@ -13,5 +13,5 @@ jobs: steps: - uses: actions-ecosystem/action-add-assignees@v1 with: - repo_token: "${{ secrets.GITHUB_TOKEN }}" + github_token: "${{ secrets.GITHUB_TOKEN }}" assignees: ${{ github.actor }} From eeb4d2780163794f4b705062e49dbdc3283ebce0 Mon Sep 17 00:00:00 2001 From: Paul Mattione <156858817+pmattione-nvidia@users.noreply.github.com> Date: Tue, 29 Oct 2024 17:12:43 -0400 Subject: [PATCH 5/7] Parquet reader list microkernel (#16538) This PR refactors fixed-width parquet list reader decoding into its own set of micro-kernels, templatizing the existing fixed-width microkernels. When skipping rows for lists, this will skip ahead the decoding of the definition, repetition, and dictionary rle_streams as well. The list kernel uses 128 threads per block and 71 registers per thread, so I've changed the launch_bounds to enforce a minimum of 8 blocks per SM. This causes a small register spill but the benchmarks are still faster, as seen below: DEVICE_BUFFER list benchmarks (decompress + decode, not bound by IO): run_length 1, cardinality 0, no byte_limit: 24.7% faster run_length 32, cardinality 1000, no byte_limit: 18.3% faster run_length 1, cardinality 0, 500kb byte_limit: 57% faster run_length 32, cardinality 1000, 500kb byte_limit: 53% faster Compressed list of ints on hard drive: 5.5% faster Sample real data on hard drive (many columns not lists): 0.5% faster Authors: - Paul Mattione (https://github.com/pmattione-nvidia) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - https://github.com/nvdbaranec - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/16538 --- cpp/src/io/parquet/decode_fixed.cu | 585 ++++++++++++++++++++++++----- cpp/src/io/parquet/page_hdr.cu | 17 +- cpp/src/io/parquet/parquet_gpu.hpp | 10 + cpp/src/io/parquet/reader_impl.cpp | 45 +++ cpp/src/io/parquet/rle_stream.cuh | 81 ++-- 5 files changed, 615 insertions(+), 123 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 4522ea7fe56..45380e6ea20 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -37,7 +37,14 @@ struct block_scan_results { }; template -static __device__ void scan_block_exclusive_sum(int thread_bit, block_scan_results& results) +using block_scan_temp_storage = int[decode_block_size / cudf::detail::warp_size]; + +// Similar to CUB, must __syncthreads() after calling if reusing temp_storage +template +__device__ inline static void scan_block_exclusive_sum( + int thread_bit, + block_scan_results& results, + block_scan_temp_storage& temp_storage) { int const t = threadIdx.x; int const warp_index = t / cudf::detail::warp_size; @@ -45,15 +52,19 @@ static __device__ void scan_block_exclusive_sum(int thread_bit, block_scan_resul uint32_t const lane_mask = (uint32_t(1) << warp_lane) - 1; uint32_t warp_bits = ballot(thread_bit); - scan_block_exclusive_sum(warp_bits, warp_lane, warp_index, lane_mask, results); + scan_block_exclusive_sum( + warp_bits, warp_lane, warp_index, lane_mask, results, temp_storage); } +// Similar to CUB, must __syncthreads() after calling if reusing temp_storage template -__device__ static void scan_block_exclusive_sum(uint32_t warp_bits, - int warp_lane, - int warp_index, - uint32_t lane_mask, - block_scan_results& results) +__device__ static void scan_block_exclusive_sum( + uint32_t warp_bits, + int warp_lane, + int warp_index, + uint32_t lane_mask, + block_scan_results& results, + block_scan_temp_storage& temp_storage) { // Compute # warps constexpr int num_warps = decode_block_size / cudf::detail::warp_size; @@ -64,49 +75,64 @@ __device__ static void scan_block_exclusive_sum(uint32_t warp_bits, results.thread_count_within_warp = __popc(results.warp_bits & lane_mask); // Share the warp counts amongst the block threads - __shared__ int warp_counts[num_warps]; - if (warp_lane == 0) { warp_counts[warp_index] = results.warp_count; } - __syncthreads(); + if (warp_lane == 0) { temp_storage[warp_index] = results.warp_count; } + __syncthreads(); // Sync to share counts between threads/warps // Compute block-wide results results.block_count = 0; results.thread_count_within_block = results.thread_count_within_warp; for (int warp_idx = 0; warp_idx < num_warps; ++warp_idx) { - results.block_count += warp_counts[warp_idx]; - if (warp_idx < warp_index) { results.thread_count_within_block += warp_counts[warp_idx]; } + results.block_count += temp_storage[warp_idx]; + if (warp_idx < warp_index) { results.thread_count_within_block += temp_storage[warp_idx]; } } } -template -__device__ inline void gpuDecodeFixedWidthValues( +template +__device__ void gpuDecodeFixedWidthValues( page_state_s* s, state_buf* const sb, int start, int end, int t) { constexpr int num_warps = block_size / cudf::detail::warp_size; constexpr int max_batch_size = num_warps * cudf::detail::warp_size; - PageNestingDecodeInfo* nesting_info_base = s->nesting_info; - int const dtype = s->col.physical_type; + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + auto const data_out = s->nesting_info[leaf_level_index].data_out; + + int const dtype = s->col.physical_type; + uint32_t const dtype_len = s->dtype_len; + + int const skipped_leaf_values = s->page.skipped_leaf_values; // decode values int pos = start; while (pos < end) { int const batch_size = min(max_batch_size, end - pos); - int const target_pos = pos + batch_size; - int const src_pos = pos + t; + int const thread_pos = pos + t; - // the position in the output column/buffer - int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; + // Index from value buffer (doesn't include nulls) to final array (has gaps for nulls) + int const dst_pos = [&]() { + int dst_pos = sb->nz_idx[rolling_index(thread_pos)]; + if constexpr (!has_lists_t) { dst_pos -= s->first_row; } + return dst_pos; + }(); // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. - if (src_pos < target_pos && dst_pos >= 0) { + if (thread_pos < target_pos && dst_pos >= 0) { // nesting level that is storing actual leaf values - int const leaf_level_index = s->col.max_nesting_depth - 1; - uint32_t dtype_len = s->dtype_len; - void* dst = - nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies (lists), there is no 1:1 mapping of rows to values. So src_pos + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + int const src_pos = [&]() { + if constexpr (has_lists_t) { return thread_pos + skipped_leaf_values; } + return thread_pos; + }(); + + void* const dst = data_out + (static_cast(dst_pos) * dtype_len); + if (s->col.logical_type.has_value() && s->col.logical_type->type == LogicalType::DECIMAL) { switch (dtype) { case INT32: gpuOutputFast(s, sb, src_pos, static_cast(dst)); break; @@ -145,15 +171,15 @@ __device__ inline void gpuDecodeFixedWidthValues( } } -template +template struct decode_fixed_width_values_func { __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) { - gpuDecodeFixedWidthValues(s, sb, start, end, t); + gpuDecodeFixedWidthValues(s, sb, start, end, t); } }; -template +template __device__ inline void gpuDecodeFixedWidthSplitValues( page_state_s* s, state_buf* const sb, int start, int end, int t) { @@ -161,10 +187,15 @@ __device__ inline void gpuDecodeFixedWidthSplitValues( constexpr int num_warps = block_size / warp_size; constexpr int max_batch_size = num_warps * warp_size; - PageNestingDecodeInfo* nesting_info_base = s->nesting_info; - int const dtype = s->col.physical_type; - auto const data_len = thrust::distance(s->data_start, s->data_end); - auto const num_values = data_len / s->dtype_len_in; + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + auto const data_out = s->nesting_info[leaf_level_index].data_out; + + int const dtype = s->col.physical_type; + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + + int const skipped_leaf_values = s->page.skipped_leaf_values; // decode values int pos = start; @@ -172,21 +203,34 @@ __device__ inline void gpuDecodeFixedWidthSplitValues( int const batch_size = min(max_batch_size, end - pos); int const target_pos = pos + batch_size; - int const src_pos = pos + t; + int const thread_pos = pos + t; // the position in the output column/buffer - int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; + // Index from value buffer (doesn't include nulls) to final array (has gaps for nulls) + int const dst_pos = [&]() { + int dst_pos = sb->nz_idx[rolling_index(thread_pos)]; + if constexpr (!has_lists_t) { dst_pos -= s->first_row; } + return dst_pos; + }(); // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. - if (src_pos < target_pos && dst_pos >= 0) { - // nesting level that is storing actual leaf values - int const leaf_level_index = s->col.max_nesting_depth - 1; + if (thread_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies (lists), there is no 1:1 mapping of rows to values. So src_pos + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + int const src_pos = [&]() { + if constexpr (has_lists_t) { + return thread_pos + skipped_leaf_values; + } else { + return thread_pos; + } + }(); - uint32_t dtype_len = s->dtype_len; - uint8_t const* src = s->data_start + src_pos; - uint8_t* dst = - nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + uint32_t const dtype_len = s->dtype_len; + uint8_t const* const src = s->data_start + src_pos; + uint8_t* const dst = data_out + static_cast(dst_pos) * dtype_len; auto const is_decimal = s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; @@ -239,11 +283,11 @@ __device__ inline void gpuDecodeFixedWidthSplitValues( } } -template +template struct decode_fixed_width_split_values_func { __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) { - gpuDecodeFixedWidthSplitValues(s, sb, start, end, t); + gpuDecodeFixedWidthSplitValues(s, sb, start, end, t); } }; @@ -274,12 +318,14 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( int const batch_size = min(max_batch_size, capped_target_value_count - value_count); // definition level - int d = 1; - if (t >= batch_size) { - d = -1; - } else if (def) { - d = static_cast(def[rolling_index(value_count + t)]); - } + int const d = [&]() { + if (t >= batch_size) { + return -1; + } else if (def) { + return static_cast(def[rolling_index(value_count + t)]); + } + return 1; + }(); int const thread_value_count = t; int const block_value_count = batch_size; @@ -340,6 +386,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested( if (is_valid) { int const dst_pos = value_count + thread_value_count; int const src_pos = max_depth_valid_count + thread_valid_count; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; } // update stuff @@ -396,16 +443,16 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat( int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); // use definition level & row bounds to determine if is valid - int is_valid; - if (t >= batch_size) { - is_valid = 0; - } else if (def) { - int const def_level = - static_cast(def[rolling_index(value_count + t)]); - is_valid = ((def_level > 0) && in_row_bounds) ? 1 : 0; - } else { - is_valid = in_row_bounds; - } + int const is_valid = [&]() { + if (t >= batch_size) { + return 0; + } else if (def) { + int const def_level = + static_cast(def[rolling_index(value_count + t)]); + return ((def_level > 0) && in_row_bounds) ? 1 : 0; + } + return in_row_bounds; + }(); // thread and block validity count using block_scan = cub::BlockScan; @@ -447,8 +494,9 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat( // output offset if (is_valid) { - int const dst_pos = value_count + thread_value_count; - int const src_pos = valid_count + thread_valid_count; + int const dst_pos = value_count + thread_value_count; + int const src_pos = valid_count + thread_valid_count; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; } @@ -460,7 +508,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesFlat( if (t == 0) { // update valid value count for decoding and total # of values we've processed ni.valid_count = valid_count; - ni.value_count = value_count; // TODO: remove? this is unused in the non-list path + ni.value_count = value_count; s->nz_count = valid_count; s->input_value_count = value_count; s->input_row_count = value_count; @@ -533,6 +581,239 @@ static __device__ int gpuUpdateValidityAndRowIndicesNonNullable(int32_t target_v return valid_count; } +template +static __device__ int gpuUpdateValidityAndRowIndicesLists(int32_t target_value_count, + page_state_s* s, + state_buf* sb, + level_t const* const def, + level_t const* const rep, + int t) +{ + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int max_batch_size = num_warps * cudf::detail::warp_size; + + // how many (input) values we've processed in the page so far, prior to this loop iteration + int value_count = s->input_value_count; + + // how many rows we've processed in the page so far + int input_row_count = s->input_row_count; + + // cap by last row so that we don't process any rows past what we want to output. + int const first_row = s->first_row; + int const last_row = first_row + s->num_rows; + + int const row_index_lower_bound = s->row_index_lower_bound; + int const max_depth = s->col.max_nesting_depth - 1; + int max_depth_valid_count = s->nesting_info[max_depth].valid_count; + + int const warp_index = t / cudf::detail::warp_size; + int const warp_lane = t % cudf::detail::warp_size; + bool const is_first_lane = (warp_lane == 0); + + __syncthreads(); + __shared__ block_scan_temp_storage temp_storage; + + while (value_count < target_value_count) { + bool const within_batch = value_count + t < target_value_count; + + // get definition level, use repetition level to get start/end depth + // different for each thread, as each thread has a different r/d + auto const [def_level, start_depth, end_depth] = [&]() { + if (!within_batch) { return cuda::std::make_tuple(-1, -1, -1); } + + int const level_index = rolling_index(value_count + t); + int const rep_level = static_cast(rep[level_index]); + int const start_depth = s->nesting_info[rep_level].start_depth; + + if constexpr (!nullable) { + return cuda::std::make_tuple(-1, start_depth, max_depth); + } else { + if (def != nullptr) { + int const def_level = static_cast(def[level_index]); + return cuda::std::make_tuple( + def_level, start_depth, s->nesting_info[def_level].end_depth); + } else { + return cuda::std::make_tuple(1, start_depth, max_depth); + } + } + }(); + + // Determine value count & row index + // track (page-relative) row index for the thread so we can compare against input bounds + // keep track of overall # of rows we've read. + int const is_new_row = start_depth == 0 ? 1 : 0; + int num_prior_new_rows, total_num_new_rows; + { + block_scan_results new_row_scan_results; + scan_block_exclusive_sum(is_new_row, new_row_scan_results, temp_storage); + __syncthreads(); + num_prior_new_rows = new_row_scan_results.thread_count_within_block; + total_num_new_rows = new_row_scan_results.block_count; + } + + int const row_index = input_row_count + ((num_prior_new_rows + is_new_row) - 1); + input_row_count += total_num_new_rows; + int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); + + // VALUE COUNT: + // in_nesting_bounds: if at a nesting level where we need to add value indices + // the bounds: from current rep to the rep AT the def depth + int in_nesting_bounds = ((0 >= start_depth && 0 <= end_depth) && in_row_bounds) ? 1 : 0; + int thread_value_count_within_warp, warp_value_count, thread_value_count, block_value_count; + { + block_scan_results value_count_scan_results; + scan_block_exclusive_sum( + in_nesting_bounds, value_count_scan_results, temp_storage); + __syncthreads(); + + thread_value_count_within_warp = value_count_scan_results.thread_count_within_warp; + warp_value_count = value_count_scan_results.warp_count; + thread_value_count = value_count_scan_results.thread_count_within_block; + block_value_count = value_count_scan_results.block_count; + } + + // iterate by depth + for (int d_idx = 0; d_idx <= max_depth; d_idx++) { + auto& ni = s->nesting_info[d_idx]; + + // everything up to the max_def_level is a non-null value + int const is_valid = [&](int input_def_level) { + if constexpr (nullable) { + return ((input_def_level >= ni.max_def_level) && in_nesting_bounds) ? 1 : 0; + } else { + return in_nesting_bounds; + } + }(def_level); + + // VALID COUNT: + // Not all values visited by this block will represent a value at this nesting level. + // the validity bit for thread t might actually represent output value t-6. + // the correct position for thread t's bit is thread_value_count. + uint32_t const warp_valid_mask = + WarpReduceOr32((uint32_t)is_valid << thread_value_count_within_warp); + int thread_valid_count, block_valid_count; + { + auto thread_mask = (uint32_t(1) << thread_value_count_within_warp) - 1; + + block_scan_results valid_count_scan_results; + scan_block_exclusive_sum(warp_valid_mask, + warp_lane, + warp_index, + thread_mask, + valid_count_scan_results, + temp_storage); + __syncthreads(); + thread_valid_count = valid_count_scan_results.thread_count_within_block; + block_valid_count = valid_count_scan_results.block_count; + } + + // compute warp and thread value counts for the -next- nesting level. we need to + // do this for lists so that we can emit an offset for the -current- nesting level. + // the offset for the current nesting level == current length of the next nesting level + int next_thread_value_count_within_warp = 0, next_warp_value_count = 0; + int next_thread_value_count = 0, next_block_value_count = 0; + int next_in_nesting_bounds = 0; + if (d_idx < max_depth) { + // NEXT DEPTH VALUE COUNT: + next_in_nesting_bounds = + ((d_idx + 1 >= start_depth) && (d_idx + 1 <= end_depth) && in_row_bounds) ? 1 : 0; + { + block_scan_results next_value_count_scan_results; + scan_block_exclusive_sum( + next_in_nesting_bounds, next_value_count_scan_results, temp_storage); + __syncthreads(); + + next_thread_value_count_within_warp = + next_value_count_scan_results.thread_count_within_warp; + next_warp_value_count = next_value_count_scan_results.warp_count; + next_thread_value_count = next_value_count_scan_results.thread_count_within_block; + next_block_value_count = next_value_count_scan_results.block_count; + } + + // STORE OFFSET TO THE LIST LOCATION + // if we're -not- at a leaf column and we're within nesting/row bounds + // and we have a valid data_out pointer, it implies this is a list column, so + // emit an offset. + if (in_nesting_bounds && ni.data_out != nullptr) { + const auto& next_ni = s->nesting_info[d_idx + 1]; + int const idx = ni.value_count + thread_value_count; + cudf::size_type const ofs = + next_ni.value_count + next_thread_value_count + next_ni.page_start_value; + + (reinterpret_cast(ni.data_out))[idx] = ofs; + } + } + + // validity is processed per-warp (on lane 0's) + // thi is because when atomic writes are needed, they are 32-bit operations + // + // lists always read and write to the same bounds + // (that is, read and write positions are already pre-bounded by first_row/num_rows). + // since we are about to write the validity vector + // here we need to adjust our computed mask to take into account the write row bounds. + if constexpr (nullable) { + if (is_first_lane && (ni.valid_map != nullptr) && (warp_value_count > 0)) { + // absolute bit offset into the output validity map + // is cumulative sum of warp_value_count at the given nesting depth + // DON'T subtract by first_row: since it's lists it's not 1-row-per-value + int const bit_offset = ni.valid_map_offset + thread_value_count; + + store_validity(bit_offset, ni.valid_map, warp_valid_mask, warp_value_count); + } + + if (t == 0) { ni.null_count += block_value_count - block_valid_count; } + } + + // if this is valid and we're at the leaf, output dst_pos + // Read value_count before the sync, so that when thread 0 modifies it we've already read its + // value + int const current_value_count = ni.value_count; + __syncthreads(); // guard against modification of ni.value_count below + if (d_idx == max_depth) { + if (is_valid) { + int const dst_pos = current_value_count + thread_value_count; + int const src_pos = max_depth_valid_count + thread_valid_count; + int const output_index = rolling_index(src_pos); + + // Index from rolling buffer of values (which doesn't include nulls) to final array (which + // includes gaps for nulls) + sb->nz_idx[output_index] = dst_pos; + } + max_depth_valid_count += block_valid_count; + } + + // update stuff + if (t == 0) { + ni.value_count += block_value_count; + ni.valid_map_offset += block_value_count; + } + __syncthreads(); // sync modification of ni.value_count + + // propagate value counts for the next depth level + block_value_count = next_block_value_count; + thread_value_count = next_thread_value_count; + in_nesting_bounds = next_in_nesting_bounds; + warp_value_count = next_warp_value_count; + thread_value_count_within_warp = next_thread_value_count_within_warp; + } // END OF DEPTH LOOP + + int const batch_size = min(max_batch_size, target_value_count - value_count); + value_count += batch_size; + } + + if (t == 0) { + // update valid value count for decoding and total # of values we've processed + s->nesting_info[max_depth].valid_count = max_depth_valid_count; + s->nz_count = max_depth_valid_count; + s->input_value_count = value_count; + + // If we have lists # rows != # values + s->input_row_count = input_row_count; + } + + return max_depth_valid_count; +} + // is the page marked nullable or not __device__ inline bool is_nullable(page_state_s* s) { @@ -560,6 +841,23 @@ __device__ inline bool maybe_has_nulls(page_state_s* s) return run_val != s->col.max_level[lvl]; } +template +__device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t) +{ + // it could be that (e.g.) we skip 5000 but starting at row 4000 we have a run of length 2000: + // in that case skip_decode() only skips 4000, and we have to process the remaining 1000 up front + // modulo 2 * block_size of course, since that's as many as we process at once + int num_skipped = parquet_stream.skip_decode(t, num_to_skip); + while (num_skipped < num_to_skip) { + // TODO: Instead of decoding, skip within the run to the appropriate location + auto const to_decode = min(rolling_buf_size, num_to_skip - num_skipped); + num_skipped += parquet_stream.decode_next(t, to_decode); + __syncthreads(); + } + + return num_skipped; +} + /** * @brief Kernel for computing fixed width non dictionary column data stored in the pages * @@ -579,9 +877,10 @@ template + bool has_lists_t, + template typename DecodeValuesFunc> -CUDF_KERNEL void __launch_bounds__(decode_block_size_t) +CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8) gpuDecodePageDataGeneric(PageInfo* pages, device_span chunks, size_t min_row, @@ -621,31 +920,29 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. if (s->num_rows == 0) { return; } - DecodeValuesFunc decode_values; + DecodeValuesFunc decode_values; - bool const nullable = is_nullable(s); - bool const should_process_nulls = nullable && maybe_has_nulls(s); + bool const should_process_nulls = is_nullable(s) && maybe_has_nulls(s); // shared buffer. all shared memory is suballocated out of here - // constexpr int shared_rep_size = has_lists_t ? cudf::util::round_up_unsafe(rle_run_buffer_size * - // sizeof(rle_run), size_t{16}) : 0; + constexpr int shared_rep_size = + has_lists_t + ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) + : 0; constexpr int shared_dict_size = has_dict_t ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) : 0; constexpr int shared_def_size = cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); - constexpr int shared_buf_size = /*shared_rep_size +*/ shared_dict_size + shared_def_size; + constexpr int shared_buf_size = shared_rep_size + shared_dict_size + shared_def_size; __shared__ __align__(16) uint8_t shared_buf[shared_buf_size]; // setup all shared memory buffers - int shared_offset = 0; - /* - rle_run *rep_runs = reinterpret_cast*>(shared_buf + shared_offset); - if constexpr (has_lists_t){ - shared_offset += shared_rep_size; - } - */ + int shared_offset = 0; + rle_run* rep_runs = reinterpret_cast*>(shared_buf + shared_offset); + if constexpr (has_lists_t) { shared_offset += shared_rep_size; } + rle_run* dict_runs = reinterpret_cast*>(shared_buf + shared_offset); if constexpr (has_dict_t) { shared_offset += shared_dict_size; } rle_run* def_runs = reinterpret_cast*>(shared_buf + shared_offset); @@ -660,38 +957,51 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) def, s->page.num_input_values); } - /* + rle_stream rep_decoder{rep_runs}; level_t* const rep = reinterpret_cast(pp->lvl_decode_buf[level_type::REPETITION]); - if constexpr(has_lists_t){ + if constexpr (has_lists_t) { rep_decoder.init(s->col.level_bits[level_type::REPETITION], s->abs_lvl_start[level_type::REPETITION], s->abs_lvl_end[level_type::REPETITION], rep, s->page.num_input_values); } - */ rle_stream dict_stream{dict_runs}; if constexpr (has_dict_t) { dict_stream.init( s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); } - __syncthreads(); // We use two counters in the loop below: processed_count and valid_count. - // - processed_count: number of rows out of num_input_values that we have decoded so far. + // - processed_count: number of values out of num_input_values that we have decoded so far. // the definition stream returns the number of total rows it has processed in each call // to decode_next and we accumulate in process_count. - // - valid_count: number of non-null rows we have decoded so far. In each iteration of the + // - valid_count: number of non-null values we have decoded so far. In each iteration of the // loop below, we look at the number of valid items (which could be all for non-nullable), // and valid_count is that running count. int processed_count = 0; int valid_count = 0; + + // Skip ahead in the decoding so that we don't repeat work (skipped_leaf_values = 0 for non-lists) + if constexpr (has_lists_t) { + auto const skipped_leaf_values = s->page.skipped_leaf_values; + if (skipped_leaf_values > 0) { + if (should_process_nulls) { + skip_decode(def_decoder, skipped_leaf_values, t); + } + processed_count = skip_decode(rep_decoder, skipped_leaf_values, t); + if constexpr (has_dict_t) { + skip_decode(dict_stream, skipped_leaf_values, t); + } + } + } + // the core loop. decode batches of level stream data using rle_stream objects // and pass the results to gpuDecodeValues // For chunked reads we may not process all of the rows on the page; if not stop early - int last_row = s->first_row + s->num_rows; + int const last_row = s->first_row + s->num_rows; while ((s->error == 0) && (processed_count < s->page.num_input_values) && (s->input_row_count <= last_row)) { int next_valid_count; @@ -701,7 +1011,12 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) processed_count += def_decoder.decode_next(t); __syncthreads(); - if constexpr (has_nesting_t) { + if constexpr (has_lists_t) { + rep_decoder.decode_next(t); + __syncthreads(); + next_valid_count = gpuUpdateValidityAndRowIndicesLists( + processed_count, s, sb, def, rep, t); + } else if constexpr (has_nesting_t) { next_valid_count = gpuUpdateValidityAndRowIndicesNested( processed_count, s, sb, def, t); } else { @@ -713,9 +1028,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t) // this function call entirely since all it will ever generate is a mapping of (i -> i) for // nz_idx. gpuDecodeFixedWidthValues would be the only work that happens. else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = - gpuUpdateValidityAndRowIndicesNonNullable(processed_count, s, sb, t); + if constexpr (has_lists_t) { + processed_count += rep_decoder.decode_next(t); + __syncthreads(); + next_valid_count = gpuUpdateValidityAndRowIndicesLists( + processed_count, s, sb, nullptr, rep, t); + } else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + next_valid_count = + gpuUpdateValidityAndRowIndicesNonNullable(processed_count, s, sb, t); + } } __syncthreads(); @@ -745,6 +1067,7 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { @@ -754,12 +1077,23 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -769,17 +1103,29 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, decode_kernel_mask::FIXED_WIDTH_NO_DICT, false, false, + false, decode_fixed_width_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } } else { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -789,6 +1135,7 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, decode_kernel_mask::FIXED_WIDTH_NO_DICT, false, false, + false, decode_fixed_width_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -802,6 +1149,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { @@ -811,12 +1159,23 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -826,17 +1185,29 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa decode_kernel_mask::FIXED_WIDTH_DICT, true, false, + false, decode_fixed_width_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } } else { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -846,6 +1217,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa decode_kernel_mask::FIXED_WIDTH_DICT, true, false, + true, decode_fixed_width_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -860,6 +1232,7 @@ DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { @@ -869,12 +1242,23 @@ DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -884,17 +1268,29 @@ DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT, false, false, + false, decode_fixed_width_split_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); } } else { - if (has_nesting) { + if (is_list) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else if (has_nesting) { gpuDecodePageDataGeneric <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); @@ -904,6 +1300,7 @@ DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT, false, false, + false, decode_fixed_width_split_values_func> <<>>( pages.device_ptr(), chunks, min_row, num_rows, error_code); diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index d604642be54..52d53cb8225 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -183,17 +183,20 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, return decode_kernel_mask::STRING; } - if (!is_list(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (!is_byte_array(chunk) && !is_boolean(chunk)) { if (page.encoding == Encoding::PLAIN) { - return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED - : decode_kernel_mask::FIXED_WIDTH_NO_DICT; + return is_list(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST + : is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_NO_DICT; } else if (page.encoding == Encoding::PLAIN_DICTIONARY || page.encoding == Encoding::RLE_DICTIONARY) { - return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_DICT_NESTED - : decode_kernel_mask::FIXED_WIDTH_DICT; + return is_list(chunk) ? decode_kernel_mask::FIXED_WIDTH_DICT_LIST + : is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_DICT; } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { - return is_nested(chunk) ? decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED - : decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT; + return is_list(chunk) ? decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST + : is_nested(chunk) ? decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED + : decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT; } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index be502b581af..dba24b553e6 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -220,6 +220,10 @@ enum class decode_kernel_mask { (1 << 9), // Same as above but for nested, fixed-width data FIXED_WIDTH_NO_DICT_NESTED = (1 << 10), // Run decode kernel for fixed width non-dictionary pages FIXED_WIDTH_DICT_NESTED = (1 << 11), // Run decode kernel for fixed width dictionary pages + FIXED_WIDTH_DICT_LIST = (1 << 12), // Run decode kernel for fixed width dictionary pages + FIXED_WIDTH_NO_DICT_LIST = (1 << 13), // Run decode kernel for fixed width non-dictionary pages + BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST = + (1 << 14), // Run decode kernel for BYTE_STREAM_SPLIT encoded data for fixed width lists }; // mask representing all the ways in which a string can be encoded @@ -908,6 +912,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span pages, * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding * @param[in] has_nesting Whether or not the data contains nested (but not list) data. + * @param[in] is_list Whether or not the data contains list data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -917,6 +922,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream); @@ -932,6 +938,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding * @param[in] has_nesting Whether or not the data contains nested (but not list) data. + * @param[in] is_list Whether or not the data contains list data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -941,6 +948,7 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream); @@ -956,6 +964,7 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding * @param[in] has_nesting Whether or not the data contains nested (but not list) data. + * @param[in] is_list Whether or not the data contains list data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -965,6 +974,7 @@ void DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages size_t min_row, int level_type_size, bool has_nesting, + bool is_list, kernel_error::pointer error_code, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index fed1a309064..689386b8957 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -272,6 +272,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, false, + false, error_code.data(), streams[s_idx++]); } @@ -284,6 +285,20 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, true, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch byte stream split decoder, for list columns + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST) != 0) { + DecodeSplitPageFixedWidthData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, + true, error_code.data(), streams[s_idx++]); } @@ -307,6 +322,20 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, false, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder for lists + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) != 0) { + DecodePageDataFixed(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, + true, error_code.data(), streams[s_idx++]); } @@ -319,6 +348,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, true, + false, error_code.data(), streams[s_idx++]); } @@ -331,6 +361,20 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, false, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder with dictionaries for lists + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_LIST) != 0) { + DecodePageDataFixedDict(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, + true, error_code.data(), streams[s_idx++]); } @@ -343,6 +387,7 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num skip_rows, level_type_size, true, + false, error_code.data(), streams[s_idx++]); } diff --git a/cpp/src/io/parquet/rle_stream.cuh b/cpp/src/io/parquet/rle_stream.cuh index 4a0791d5c54..69e783a89d0 100644 --- a/cpp/src/io/parquet/rle_stream.cuh +++ b/cpp/src/io/parquet/rle_stream.cuh @@ -19,6 +19,7 @@ #include "parquet_gpu.hpp" #include +#include namespace cudf::io::parquet::detail { @@ -216,6 +217,26 @@ struct rle_stream { decode_index = -1; // signals the first iteration. Nothing to decode. } + __device__ inline int get_rle_run_info(rle_run& run) + { + run.start = cur; + run.level_run = get_vlq32(run.start, end); + + // run_bytes includes the header size + int run_bytes = run.start - cur; + if (is_literal_run(run.level_run)) { + // from the parquet spec: literal runs always come in multiples of 8 values. + run.size = (run.level_run >> 1) * 8; + run_bytes += util::div_rounding_up_unsafe(run.size * level_bits, 8); + } else { + // repeated value run + run.size = (run.level_run >> 1); + run_bytes += util::div_rounding_up_unsafe(level_bits, 8); + } + + return run_bytes; + } + __device__ inline void fill_run_batch() { // decode_index == -1 means we are on the very first decode iteration for this stream. @@ -226,31 +247,14 @@ struct rle_stream { while (((decode_index == -1 && fill_index < num_rle_stream_decode_warps) || fill_index < decode_index + run_buffer_size) && cur < end) { - auto& run = runs[rolling_index(fill_index)]; - // Encoding::RLE + // Pass by reference to fill the runs shared memory with the run data + auto& run = runs[rolling_index(fill_index)]; + int const run_bytes = get_rle_run_info(run); - // bytes for the varint header - uint8_t const* _cur = cur; - int const level_run = get_vlq32(_cur, end); - // run_bytes includes the header size - int run_bytes = _cur - cur; - - // literal run - if (is_literal_run(level_run)) { - // from the parquet spec: literal runs always come in multiples of 8 values. - run.size = (level_run >> 1) * 8; - run_bytes += ((run.size * level_bits) + 7) >> 3; - } - // repeated value run - else { - run.size = (level_run >> 1); - run_bytes += ((level_bits) + 7) >> 3; - } - run.output_pos = output_pos; - run.start = _cur; - run.level_run = level_run; run.remaining = run.size; + run.output_pos = output_pos; + cur += run_bytes; output_pos += run.size; fill_index++; @@ -372,6 +376,39 @@ struct rle_stream { return values_processed_shared; } + __device__ inline int skip_runs(int target_count) + { + // we want to process all runs UP TO BUT NOT INCLUDING the run that overlaps with the skip + // amount so threads spin like crazy on fill_run_batch(), skipping writing unnecessary run info. + // then when it hits the one that matters, we don't process it at all and bail as if we never + // started basically we're setting up the rle_stream vars necessary to start fill_run_batch for + // the first time + while (cur < end) { + rle_run run; + int run_bytes = get_rle_run_info(run); + + if ((output_pos + run.size) > target_count) { + return output_pos; // bail! we've reached the starting run + } + + // skip this run + output_pos += run.size; + cur += run_bytes; + } + + return output_pos; // we skipped everything + } + + __device__ inline int skip_decode(int t, int count) + { + int const output_count = min(count, total_values - cur_values); + + // if level_bits == 0, there's nothing to do + // a very common case: columns with no nulls, especially if they are non-nested + cur_values = (level_bits == 0) ? output_count : skip_runs(output_count); + return cur_values; + } + __device__ inline int decode_next(int t) { return decode_next(t, max_output_values); } }; From 6328ad679947eb5cbc352c345a28f079aa6b8005 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 30 Oct 2024 17:17:47 +0800 Subject: [PATCH 6/7] Make ai.rapids.cudf.HostMemoryBuffer#copyFromStream public. (#17179) This is the first pr of [a larger one](https://github.com/NVIDIA/spark-rapids-jni/pull/2532) to introduce a new serialization format. It make `ai.rapids.cudf.HostMemoryBuffer#copyFromStream` public. For more background, see https://github.com/NVIDIA/spark-rapids-jni/issues/2496 Authors: - Renjie Liu (https://github.com/liurenjie1024) - Jason Lowe (https://github.com/jlowe) Approvers: - Jason Lowe (https://github.com/jlowe) - Alessandro Bellina (https://github.com/abellina) URL: https://github.com/rapidsai/cudf/pull/17179 --- java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java index d792459901c..bfb959b12c1 100644 --- a/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java @@ -255,8 +255,10 @@ public final void copyFromHostBuffer(long destOffset, HostMemoryBuffer srcData, * @param destOffset offset in bytes in this buffer to start copying to * @param in input stream to copy bytes from * @param byteLength number of bytes to copy + * @throws EOFException If there are not enough bytes in the stream to copy. + * @throws IOException If there is an error reading from the stream. */ - final void copyFromStream(long destOffset, InputStream in, long byteLength) throws IOException { + public final void copyFromStream(long destOffset, InputStream in, long byteLength) throws IOException { addressOutOfBoundsCheck(address + destOffset, byteLength, "copy from stream"); byte[] arrayBuffer = new byte[(int) Math.min(1024 * 128, byteLength)]; long left = byteLength; @@ -264,7 +266,7 @@ final void copyFromStream(long destOffset, InputStream in, long byteLength) thro int amountToCopy = (int) Math.min(arrayBuffer.length, left); int amountRead = in.read(arrayBuffer, 0, amountToCopy); if (amountRead < 0) { - throw new EOFException(); + throw new EOFException("Unexpected end of stream, expected " + left + " more bytes"); } setBytes(destOffset, arrayBuffer, 0, amountRead); destOffset += amountRead; From 5ee7d7caaf459e7d30597e6ea2dd1904c50d12fc Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Wed, 30 Oct 2024 10:13:28 -0400 Subject: [PATCH 7/7] [no ci] Add empty-columns section to the libcudf developer guide (#17183) Adds a section on `Empty Columns` to the libcudf DEVELOPER_GUIDE Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Basit Ayantunde (https://github.com/lamarrr) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/17183 --- cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md b/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md index 311539efbfc..1c1052487f2 100644 --- a/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md +++ b/cpp/doxygen/developer_guide/DEVELOPER_GUIDE.md @@ -1483,6 +1483,17 @@ struct, and therefore `cudf::struct_view` is the data type of a `cudf::column` o `cudf::type_dispatcher` dispatches to the `struct_view` data type when invoked on a `STRUCT` column. +# Empty Columns + +The libcudf columns support empty, typed content. These columns have no data and no validity mask. +Empty strings or lists columns may or may not contain a child offsets column. +It is undefined behavior (UB) to access the offsets child of an empty strings or lists column. +Nested columns like lists and structs may require other children columns to provide the +nested structure of the empty types. + +Use `cudf::make_empty_column()` to create fixed-width and strings columns. +Use `cudf::empty_like()` to create an empty column from an existing `cudf::column_view`. + # cuIO: file reading and writing cuIO is a component of libcudf that provides GPU-accelerated reading and writing of data file