Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GH-45283: [C++][Parquet] Omit level histogram when max level is 0 (#4…
Browse files Browse the repository at this point in the history
…5285)

The level histogram of size statistics can be omitted if its max level is 0. We haven't implemented this yet and enforces histogram size to be equal to `max_level + 1`. However, when reading a Parquet file with omitted level histogram, exception will be thrown.

Omit level histogram when max level is 0.

Yes, a test case has been added to reflect the change.

No.
* GitHub Issue: #45283

Lead-authored-by: Gang Wu <ustcwg@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
2 people authored and amoeba committed Jan 31, 2025
1 parent a2d1928 commit 36bdab5
Showing 3 changed files with 126 additions and 28 deletions.
9 changes: 5 additions & 4 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
@@ -1606,11 +1606,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
::arrow::util::span<const int16_t> levels, int16_t max_level) {
if (max_level == 0) {
return;
}
ARROW_DCHECK_EQ(static_cast<size_t>(max_level) + 1, level_histogram.size());
::parquet::UpdateLevelHistogram(levels, level_histogram);
};

if (descr_->max_definition_level() > 0) {
40 changes: 26 additions & 14 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
@@ -54,23 +54,28 @@ void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) {
}

void SizeStatistics::Validate(const ColumnDescriptor* descr) const {
if (repetition_level_histogram.size() !=
static_cast<size_t>(descr->max_repetition_level() + 1)) {
throw ParquetException("Repetition level histogram size mismatch");
}
if (definition_level_histogram.size() !=
static_cast<size_t>(descr->max_definition_level() + 1)) {
throw ParquetException("Definition level histogram size mismatch");
}
auto validate_histogram = [](const std::vector<int64_t>& histogram, int16_t max_level,
const std::string& name) {
if (histogram.empty()) {
// A levels histogram is always allowed to be missing.
return;
}
if (histogram.size() != static_cast<size_t>(max_level + 1)) {
std::stringstream ss;
ss << name << " level histogram size mismatch, size: " << histogram.size()
<< ", expected: " << (max_level + 1);
throw ParquetException(ss.str());
}
};
validate_histogram(repetition_level_histogram, descr->max_repetition_level(),
"Repetition");
validate_histogram(definition_level_histogram, descr->max_definition_level(),
"Definition");
if (unencoded_byte_array_data_bytes.has_value() &&
descr->physical_type() != Type::BYTE_ARRAY) {
throw ParquetException("Unencoded byte array data bytes does not support " +
TypeToString(descr->physical_type()));
}
if (!unencoded_byte_array_data_bytes.has_value() &&
descr->physical_type() == Type::BYTE_ARRAY) {
throw ParquetException("Missing unencoded byte array data bytes");
}
}

void SizeStatistics::Reset() {
@@ -83,8 +88,15 @@ void SizeStatistics::Reset() {

std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* descr) {
auto size_stats = std::make_unique<SizeStatistics>();
size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0);
size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0);
// If the max level is 0, the level histogram can be omitted because it contains
// only single level (a.k.a. 0) and its count is equivalent to `num_values` of the
// column chunk or data page.
if (descr->max_repetition_level() != 0) {
size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0);
}
if (descr->max_definition_level() != 0) {
size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0);
}
if (descr->physical_type() == Type::BYTE_ARRAY) {
size_stats->unencoded_byte_array_data_bytes = 0;
}
105 changes: 95 additions & 10 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
@@ -168,12 +168,22 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
}
}

void Reset() {
buffer_.reset();
row_group_stats_.clear();
page_stats_.clear();
void ReadData() {
auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));
auto metadata = reader->metadata();
for (int i = 0; i < metadata->num_row_groups(); ++i) {
int64_t num_rows = metadata->RowGroup(i)->num_rows();
auto row_group_reader = reader->RowGroup(i);
for (int j = 0; j < metadata->num_columns(); ++j) {
auto column_reader = row_group_reader->RecordReader(j);
ASSERT_EQ(column_reader->ReadRecords(num_rows + 1), num_rows);
}
}
}

void Reset() { buffer_.reset(); }

protected:
std::shared_ptr<Buffer> buffer_;
std::vector<SizeStatistics> row_group_stats_;
@@ -256,24 +266,99 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/5},
SizeStatistics{/*def_levels=*/{1, 1},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/1},
SizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/4}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{5}},
PageSizeStatistics{/*def_levels=*/{1, 1},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{1}},
PageSizeStatistics{/*def_levels=*/{0, 2},
/*rep_levels=*/{2},
/*rep_levels=*/{},
/*byte_array_bytes=*/{4}}));
}

TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) {
// Rep/def level histograms are updates in batches of `write_batch_size` levels
// inside a single page. Exercise the logic with more than one batch per page.
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::list(::arrow::utf8()))});
auto table = ::arrow::TableFromJSON(schema, {R"([
[ [null,"a","ab"] ],
[ null ],
[ [] ],
[ [null,"d","de"] ],
[ ["g","gh",null] ],
[ ["j","jk",null] ]
])"});
for (int write_batch_size : {100, 5, 4, 3, 2, 1}) {
ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size);
WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
/*max_row_group_length=*/1000, /*page_size=*/1000, write_batch_size);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/12}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/{12}}));
}
}

TEST_F(SizeStatisticsRoundTripTest, LargePage) {
// When max_level is 1, the levels are summed in 2**30 chunks, exercise this
// by testing with a 90000 rows table;
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::utf8())});
auto seed_batch = ::arrow::RecordBatchFromJSON(schema, R"([
[ "a" ],
[ "bc" ],
[ null ]
])");
ASSERT_OK_AND_ASSIGN(auto table, ::arrow::Table::FromRecordBatches(
::arrow::RecordBatchVector(30000, seed_batch)));
ASSERT_OK_AND_ASSIGN(table, table->CombineChunks());
ASSERT_EQ(table->num_rows(), 90000);

WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
/*max_row_group_length=*/1 << 30, /*page_size=*/1 << 30,
/*write_batch_size=*/50000);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{30000, 60000},
/*rep_levels=*/{},
/*byte_array_bytes=*/90000}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{30000, 60000},
/*rep_levels=*/{},
/*byte_array_bytes=*/{90000}}));
}

TEST_F(SizeStatisticsRoundTripTest, MaxLevelZero) {
auto schema =
::arrow::schema({::arrow::field("a", ::arrow::utf8(), /*nullable=*/false)});
WriteFile(SizeStatisticsLevel::PageAndColumnChunk,
::arrow::TableFromJSON(schema, {R"([["foo"],["bar"]])"}),
/*max_row_group_length=*/2,
/*page_size=*/1024);
ASSERT_NO_FATAL_FAILURE(ReadSizeStatistics());
ASSERT_NO_FATAL_FAILURE(ReadData());
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{},
/*rep_levels=*/{},
/*byte_array_bytes=*/6}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{},
/*rep_levels=*/{},
/*byte_array_bytes=*/{6}}));
}

} // namespace parquet

0 comments on commit 36bdab5

Please sign in to comment.