Skip to content

Commit

Permalink
GH-31992: [C++][Parquet] Handling the special case when DataPageV2 va…
Browse files Browse the repository at this point in the history
…lues buffer is empty (#45252)

### Rationale for this change

In DataPageV2, the levels and data will not be compressed together. So, we might get the "empty" data page buffer.

When meeting this, Snappy C++ will failed to decompress the `(input_len == 0, output_len == 0)` data.

### What changes are included in this PR?

Handling the case in `column_reader.cc`

### Are these changes tested?

* [x] Will add

### Are there any user-facing changes?

Minor fix

* GitHub Issue: #31992

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
mapleFU and pitrou authored Jan 20, 2025
1 parent 61c82ea commit 9801801
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
22 changes: 15 additions & 7 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,21 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
memcpy(decompressed, page_buffer->data(), levels_byte_len);
}

// Decompress the values
PARQUET_ASSIGN_OR_THROW(
auto decompressed_len,
decompressor_->Decompress(compressed_len - levels_byte_len,
page_buffer->data() + levels_byte_len,
uncompressed_len - levels_byte_len,
decompression_buffer_->mutable_data() + levels_byte_len));
// GH-31992: DataPageV2 may store only levels and no values when all
// values are null. In this case, Parquet java is known to produce a
// 0-len compressed area (which is invalid compressed input).
// See https://github.com/apache/parquet-java/issues/3122
int64_t decompressed_len = 0;
if (uncompressed_len - levels_byte_len != 0) {
// Decompress the values
PARQUET_ASSIGN_OR_THROW(
decompressed_len,
decompressor_->Decompress(
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
uncompressed_len - levels_byte_len,
decompression_buffer_->mutable_data() + levels_byte_len));
}

if (decompressed_len != uncompressed_len - levels_byte_len) {
throw ParquetException("Page didn't decompress to expected size, expected: " +
std::to_string(uncompressed_len - levels_byte_len) +
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
int32_t values[batch_size];
int64_t values_read;
ASSERT_TRUE(reader->HasNext());
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*replevels=*/nullptr,
EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*rep_levels=*/nullptr,
values, &values_read));
EXPECT_EQ(3, values_read);
}
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1774,5 +1774,38 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
ASSERT_EQ("bar", value);
}

TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) {
// GH-31992: In DataPageV2, the levels and data will not be compressed together,
// so, when all values are null, the compressed values should be empty. And
// we should handle this case correctly.
std::vector<Compression::type> compressions = {Compression::SNAPPY, Compression::GZIP,
Compression::ZSTD, Compression::BROTLI,
Compression::LZ4};
for (auto compression : compressions) {
if (!Codec::IsAvailable(compression)) {
continue;
}
ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression));
// Optional and non-repeated, with definition levels
// but no repetition levels
this->SetUpSchema(Repetition::OPTIONAL);
this->GenerateData(SMALL_SIZE);
std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0);
ColumnProperties column_properties;
column_properties.set_compression(compression);

auto writer =
this->BuildWriter(SMALL_SIZE, column_properties, ParquetVersion::PARQUET_2_LATEST,
ParquetDataPageVersion::V2);
writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr,
this->values_ptr_);
writer->Close();

ASSERT_EQ(100, this->metadata_num_values());
this->ReadColumn(compression);
ASSERT_EQ(0, this->values_read_);
}
}

} // namespace test
} // namespace parquet

0 comments on commit 9801801

Please sign in to comment.