Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet predicate filtering with column projection #15113

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
be089f3
fix stats filter conversion dtypes and names
karthikeyann Feb 21, 2024
f458410
filter columns limitation fixed.
karthikeyann Mar 1, 2024
b01b2d8
address review comments, added docstring
karthikeyann Mar 1, 2024
b348db4
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 1, 2024
4a07e3d
add docstring for filter
karthikeyann Mar 1, 2024
6ee2bcf
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 6, 2024
acb0723
update docs with example
karthikeyann Mar 6, 2024
bff38f5
Merge branch 'fix-pq_filter_col_projection' of github.com:karthikeyan…
karthikeyann Mar 6, 2024
d643ce1
Merge branch 'branch-24.04' into fix-pq_filter_col_projection
karthikeyann Mar 6, 2024
e79552c
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann Apr 9, 2024
e40cffc
address review comments, include cleanup, reorg code
karthikeyann Apr 24, 2024
926a75a
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann Apr 24, 2024
a220d7d
fix col index ref on projection
karthikeyann May 10, 2024
c0e734c
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann May 10, 2024
96ea0e8
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
vuule May 14, 2024
47c5413
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
mhaseeb123 May 15, 2024
9e4008e
remove caching output dtypes
karthikeyann May 16, 2024
cc3bd26
Merge branch 'branch-24.06' into fix-pq_filter_col_projection
karthikeyann May 16, 2024
f64294e
wMerge branch 'fix-pq_filter_col_projection' of github.com:karthikeya…
karthikeyann May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,46 @@ class stats_expression_converter : public ast::detail::expression_transformer {
};
} // namespace

std::tuple<host_span<data_type const>, host_span<std::string const>>
aggregate_reader_metadata::get_schema_dtypes(bool strings_to_categorical, type_id timestamp_type_id)
{
// TODO, get types and names for only names present in filter.? and their col_idx.
// create root column types and names as vector
if (!_root_level_types.empty()) return {_root_level_types, _root_level_names};
std::function<cudf::data_type(int)> get_dtype = [strings_to_categorical,
timestamp_type_id,
&get_dtype,
this](int schema_idx) -> cudf::data_type {
// returns type of root level columns only.
// if (schema_idx < 0) { return false; }
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
auto const& schema_elem = get_schema(schema_idx);
if (schema_elem.is_stub()) {
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
return get_dtype(schema_elem.children_idx[0]);
}

auto const one_level_list = schema_elem.is_one_level_list(get_schema(schema_elem.parent_idx));
// if we're at the root, this is a new output column
auto const col_type = one_level_list
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);
// path_is_valid is skipped for nested columns here. TODO: more test cases where no leaf.
vuule marked this conversation as resolved.
Show resolved Hide resolved
return dtype;
};

auto const& root = get_schema(0);
for (auto const& schema_idx : root.children_idx) {
if (schema_idx < 0) { continue; }
_root_level_types.push_back(get_dtype(schema_idx));
_root_level_names.push_back(get_schema(schema_idx).name);
}
return {_root_level_types, _root_level_names};
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
;
}

std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
Expand Down Expand Up @@ -410,8 +447,8 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
// For each column, it contains #sources * #column_chunks_per_src rows.
std::vector<std::unique_ptr<column>> columns;
stats_caster stats_col{total_row_groups, per_file_metadata, input_row_group_indices};
for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const& dtype = output_dtypes[col_idx];
for (size_t col_idx = 0; col_idx < _root_level_types.size(); col_idx++) {
auto const& dtype = _root_level_types[col_idx];
// Only comparable types except fixed point are supported.
if (cudf::is_compound(dtype) && dtype.id() != cudf::type_id::STRING) {
// placeholder only for unsupported types.
Expand All @@ -427,9 +464,13 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::fi
columns.push_back(std::move(max_col));
}
auto stats_table = cudf::table(std::move(columns));
// named filter to reference filter w.r.t parquet schema order.
auto expr_conv = named_to_reference_converter(filter, _root_level_names);
auto reference_filter = expr_conv.get_converted_expr();

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter stats_expr{filter, static_cast<size_type>(output_dtypes.size())};
stats_expression_converter stats_expr{reference_filter.value().get(),
static_cast<size_type>(_root_level_types.size())};
auto stats_ast = stats_expr.get_stats_expr();
auto predicate_col = cudf::detail::compute_column(stats_table, stats_ast.get(), stream, mr);
auto predicate = predicate_col->view();
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ reader::impl::impl(std::size_t chunk_read_limit,
_strings_to_categorical,
_timestamp_type.id());

// Find the name, and dtypes of parquet root level schema. (save it in _metadata.)
_metadata->get_schema_dtypes(_strings_to_categorical, _timestamp_type.id());

// Save the states of the output buffers for reuse in `chunk_read()`.
for (auto const& buff : _output_buffers) {
_output_buffers_template.emplace_back(cudf::io::detail::inline_column_buffer::empty_like(buff));
Expand Down Expand Up @@ -508,7 +511,7 @@ table_with_metadata reader::impl::read(
auto expr_conv = named_to_reference_converter(filter, metadata);
auto output_filter = expr_conv.get_converted_expr();

prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices, output_filter);
prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices, filter);
return read_chunk_internal(uses_custom_row_bounds, output_filter);
}

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,13 @@ aggregate_reader_metadata::select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
host_span<data_type const> output_dtypes,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const
{
std::optional<std::vector<std::vector<size_type>>> filtered_row_group_indices;
// if filter is not empty, then gather row groups to read after predicate pushdown
if (filter.has_value()) {
filtered_row_group_indices =
filter_row_groups(row_group_indices, output_dtypes, filter.value(), stream);
filtered_row_group_indices = filter_row_groups(row_group_indices, filter.value(), stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
Expand Down
28 changes: 21 additions & 7 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,8 @@ class aggregate_reader_metadata {
int64_t num_rows;
size_type num_row_groups;

std::vector<data_type> _root_level_types;
std::vector<std::string> _root_level_names;
/**
* @brief Create a metadata object from each element in the source vector
*/
Expand Down Expand Up @@ -167,18 +169,19 @@ class aggregate_reader_metadata {
*/
[[nodiscard]] std::vector<std::string> get_pandas_index_names() const;

std::tuple<host_span<data_type const>, host_span<std::string const>> get_schema_dtypes(
bool strings_to_categorical, type_id timestamp_type_id);

/**
* @brief Filters the row groups based on predicate filter
*
* @param row_group_indices Lists of row groups to read, one per source
* @param output_dtypes List of output column datatypes
* @param filter AST expression to filter row groups based on Column chunk statistics
wence- marked this conversation as resolved.
Show resolved Hide resolved
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Filtered row group indices, if any is filtered.
*/
[[nodiscard]] std::optional<std::vector<std::vector<size_type>>> filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const;

Expand All @@ -191,7 +194,6 @@ class aggregate_reader_metadata {
* @param row_group_indices Lists of row groups to read, one per source
* @param row_start Starting row of the selection
* @param row_count Total number of rows selected
* @param output_dtypes List of output column datatypes
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of corrected row_start, row_count and list of row group indexes and its
Expand All @@ -201,7 +203,6 @@ class aggregate_reader_metadata {
host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;

Expand Down Expand Up @@ -234,7 +235,6 @@ class named_to_reference_converter : public ast::detail::expression_transformer
public:
named_to_reference_converter(std::optional<std::reference_wrapper<ast::expression const>> expr,
table_metadata const& metadata)
: metadata(metadata)
{
if (!expr.has_value()) return;
// create map for column name.
Expand All @@ -251,6 +251,21 @@ class named_to_reference_converter : public ast::detail::expression_transformer
expr.value().get().accept(*this);
}

named_to_reference_converter(std::reference_wrapper<ast::expression const> expr,
host_span<std::string const> root_column_names)
{
// create map for column name.
std::transform(
thrust::make_zip_iterator(root_column_names.begin(), thrust::counting_iterator<size_t>(0)),
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
thrust::make_zip_iterator(root_column_names.end(),
thrust::counting_iterator(root_column_names.size())),
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
std::inserter(column_name_to_index, column_name_to_index.end()),
[](auto const& name_index) {
return std::make_pair(thrust::get<0>(name_index), thrust::get<1>(name_index));
});

expr.get().accept(*this);
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
}
/**
* @copydoc ast::detail::expression_transformer::visit(ast::literal const& )
*/
Expand Down Expand Up @@ -284,7 +299,6 @@ class named_to_reference_converter : public ast::detail::expression_transformer
std::vector<std::reference_wrapper<ast::expression const>> visit_operands(
std::vector<std::reference_wrapper<ast::expression const>> operands);

table_metadata const& metadata;
std::unordered_map<std::string, size_type> column_name_to_index;
std::optional<std::reference_wrapper<ast::expression const>> _stats_expr;
// Using std::list or std::deque to avoid reference invalidation
Expand Down
11 changes: 1 addition & 10 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1089,18 +1089,9 @@ void reader::impl::preprocess_file(
{
CUDF_EXPECTS(!_file_preprocessed, "Attempted to preprocess file more than once");

// if filter is not empty, then create output types as vector and pass for filtering.
std::vector<data_type> output_types;
if (filter.has_value()) {
std::transform(_output_buffers.cbegin(),
_output_buffers.cend(),
std::back_inserter(output_types),
[](auto const& col) { return col.type; });
}
std::tie(
_file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) =
_metadata->select_row_groups(
row_group_indices, skip_rows, num_rows, output_types, filter, _stream);
_metadata->select_row_groups(row_group_indices, skip_rows, num_rows, filter, _stream);

if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() &&
not _input_columns.empty()) {
Expand Down
21 changes: 21 additions & 0 deletions cpp/tests/io/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,27 @@ TEST_F(ParquetReaderTest, FilterIdentity)
CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *result2.tbl);
}

TEST_F(ParquetReaderTest, FilterWithColumnProjection)
{
auto [src, filepath] = create_parquet_with_stats("FilterWithColumnProjection.parquet");
auto val = cudf::numeric_scalar<uint32_t>{10};
auto lit = cudf::ast::literal{val};
auto col_ref = cudf::ast::column_name_reference{"col_uint32"};
auto col_index = cudf::ast::column_reference{0};
auto read_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_ref, lit);
auto filter_expr = cudf::ast::operation(cudf::ast::ast_operator::LESS, col_index, lit);

auto predicate = cudf::compute_column(src, filter_expr);
auto projected_table = cudf::table_view{{src.get_column(2), src.get_column(0)}};
auto expected = cudf::apply_boolean_mask(projected_table, *predicate);

auto read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath})
.columns({"col_double", "col_uint32"})
.filter(read_expr);
auto result = cudf::io::read_parquet(read_opts);
CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);
}

TEST_F(ParquetReaderTest, FilterReferenceExpression)
{
auto [src, filepath] = create_parquet_with_stats("FilterReferenceExpression.parquet");
Expand Down
Loading