Skip to content

Commit

Permalink
GH-45283: [C++][Parquet] Omit level histogram when max level is 0 (#4…
Browse files Browse the repository at this point in the history
…5285)

The level histogram of size statistics can be omitted if its max level is 0. We haven't implemented this yet and enforces histogram size to be equal to `max_level + 1`. However, when reading a Parquet file with omitted level histogram, exception will be thrown.

Omit level histogram when max level is 0.

Yes, a test case has been added to reflect the change.

No.
* GitHub Issue: #45283

Lead-authored-by: Gang Wu <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
  • Loading branch information
2 people authored and amoeba committed Jan 31, 2025
1 parent d7385e4 commit f1f860d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 28 deletions.
9 changes: 5 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1606,11 +1606,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
::arrow::util::span<const int16_t> levels, int16_t max_level) {
if (max_level == 0) {
return;
}
ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size());
::parquet::UpdateLevelHistogram(levels, level_histogram);
};

if (descr_->max_definition_level() > 0) {
Expand Down
40 changes: 26 additions & 14 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,28 @@ void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) {
}

void SizeStatistics::Validate(const ColumnDescriptor* descr) const {
if (repetition_level_histogram.size() !=
static_cast<size_t>(descr->max_repetition_level() + 1)) {
throw ParquetException("Repetition level histogram size mismatch");
}
if (definition_level_histogram.size() !=
static_cast<size_t>(descr->max_definition_level() + 1)) {
throw ParquetException("Definition level histogram size mismatch");
}
auto validate_histogram = [](const std::vector<int64_t>& histogram, int16_t max_level,
const std::string& name) {
if (histogram.empty()) {
// A levels histogram is always allowed to be missing.
return;
}
if (histogram.size() != static_cast<size_t>(max_level + 1)) {
std::stringstream ss;
ss << name << " level histogram size mismatch, size: " << histogram.size()
<< ", expected: " << (max_level + 1);
throw ParquetException(ss.str());
}
};
validate_histogram(repetition_level_histogram, descr->max_repetition_level(),
"Repetition");
validate_histogram(definition_level_histogram, descr->max_definition_level(),
"Definition");
if (unencoded_byte_array_data_bytes.has_value() &&
descr->physical_type() != Type::BYTE_ARRAY) {
throw ParquetException("Unencoded byte array data bytes does not support " +
TypeToString(descr->physical_type()));
}
if (!unencoded_byte_array_data_bytes.has_value() &&
descr->physical_type() == Type::BYTE_ARRAY) {
throw ParquetException("Missing unencoded byte array data bytes");
}
}

void SizeStatistics::Reset() {
Expand All @@ -83,8 +88,15 @@ void SizeStatistics::Reset() {

std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* descr) {
auto size_stats = std::make_unique<SizeStatistics>();
size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0);
size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0);
// If the max level is 0, the level histogram can be omitted because it contains
// only single level (a.k.a. 0) and its count is equivalent to `num_values` of the
// column chunk or data page.
if (descr->max_repetition_level() != 0) {
size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0);
}
if (descr->max_definition_level() != 0) {
size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0);
}
if (descr->physical_type() == Type::BYTE_ARRAY) {
size_stats->unencoded_byte_array_data_bytes = 0;
}
Expand Down
105 changes: 95 additions & 10 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,22 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
}
}

void Reset() {
buffer_.reset();
row_group_stats_.clear();
page_stats_.clear();
void ReadData() {
auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
auto metadata = reader->metadata();
for (int i = 0; i < metadata->num_row_groups(); ++i) {
int64_t num_rows = metadata->RowGroup(i)->num_rows();
auto row_group_reader = reader->RowGroup(i);
for (int j = 0; j < metadata->num_columns(); ++j) {
auto column_reader = row_group_reader->RecordReader(j);
ASSERT_EQ(column_reader->ReadRecords(num_rows + 1), num_rows);
}
}
}

void Reset() { buffer_.reset(); }

protected:
std::shared_ptr<Buffer> buffer_;
std::vector<SizeStatistics> row_group_stats_;
Expand Down Expand Up @@ -256,24 +266,99 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/5},
SizeStatistics{/*def_levels=*/{1, 1},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/1},
SizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/4}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{5}},
PageSizeStatistics{/*def_levels=*/{1, 1},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{1}},
PageSizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{4}}));
}

TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) {
// Rep/def level histograms are updates in batches of `write_batch_size` levels
// inside a single page. Exercise the logic with more than one batch per page.
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::list(::arrow::utf8()))});
auto table = ::arrow::TableFromJSON(schema, {R"([
[ [null,"a","ab"] ],
[ null ],
[ [] ],
[ [null,"d","de"] ],
[ ["g","gh",null] ],
[ ["j","jk",null] ]
])"});
for (int write_batch_size : {100, 5, 4, 3, 2, 1}) {
ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size);
WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
/*max_row_group_length=*/1000, /*page_size=*/1000, write_batch_size);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/12}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/{12}}));
}
}

TEST_F(SizeStatisticsRoundTripTest, LargePage) {
// When max_level is 1, the levels are summed in 2**30 chunks, exercise this
// by testing with a 90000 rows table;
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())});
auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([
[ "a" ],
[ "bc" ],
[ null ]
])");
ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches(
::arrow::RecordBatchVector(30000, seed_batch)));
ASSERT_OK_AND_ASSIGN(table, table->CombineChunks());
ASSERT_EQ(table->num_rows(), 90000);

WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
/*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30,
/*write_batch_size=*/50000);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 60000},
/*rep_levels=*/{},
/*byte_array_bytes=*/90000}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 60000},
/*rep_levels=*/{},
/*byte_array_bytes=*/{90000}}));
}

TEST_F(SizeStatisticsRoundTripTest, MaxLevelZero) {
auto schema =
::arrow::schema({::arrow::field("a", ::arrow::utf8(), /*nullable=*/false)});
WriteFile(SizeStatisticsLevel::PageAndColumnChunk,
::arrow::TableFromJSON(schema, {R"([["foo"],["bar"]])"}),
/*max_row_group_length=*/2,
/*page_size=*/1024);
ASSERT_NO_FATAL_FAILURE(ReadSizeStatistics());
ASSERT_NO_FATAL_FAILURE(ReadData());
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{},
/*rep_levels=*/{},
/*byte_array_bytes=*/6}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{},
/*rep_levels=*/{},
/*byte_array_bytes=*/{6}}));
}

} // namespace parquet

0 comments on commit f1f860d

Please sign in to comment.