From 9801801df3f75339f705264252c0eac189f5f2a3 Mon Sep 17 00:00:00 2001 From: mwish <1506118561@qq.com> Date: Mon, 20 Jan 2025 19:02:11 +0800 Subject: [PATCH] GH-31992: [C++][Parquet] Handling the special case when DataPageV2 values buffer is empty (#45252) ### Rationale for this change In DataPageV2, the levels and data will not be compressed together. So, we might get the "empty" data page buffer. When meeting this, Snappy C++ will failed to decompress the `(input_len == 0, output_len == 0)` data. ### What changes are included in this PR? Handling the case in `column_reader.cc` ### Are these changes tested? * [x] Will add ### Are there any user-facing changes? Minor fix * GitHub Issue: #31992 Lead-authored-by: mwish Co-authored-by: mwish <1506118561@qq.com> Co-authored-by: Antoine Pitrou Signed-off-by: mwish --- cpp/src/parquet/column_reader.cc | 22 ++++++++++++------ cpp/src/parquet/column_reader_test.cc | 2 +- cpp/src/parquet/column_writer_test.cc | 33 +++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 2a3bbf76d1c6e..8d4169b034ac6 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -580,13 +580,21 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( memcpy(decompressed, page_buffer->data(), levels_byte_len); } - // Decompress the values - PARQUET_ASSIGN_OR_THROW( - auto decompressed_len, - decompressor_->Decompress(compressed_len - levels_byte_len, - page_buffer->data() + levels_byte_len, - uncompressed_len - levels_byte_len, - decompression_buffer_->mutable_data() + levels_byte_len)); + // GH-31992: DataPageV2 may store only levels and no values when all + // values are null. In this case, Parquet java is known to produce a + // 0-len compressed area (which is invalid compressed input). + // See https://github.com/apache/parquet-java/issues/3122 + int64_t decompressed_len = 0; + if (uncompressed_len - levels_byte_len != 0) { + // Decompress the values + PARQUET_ASSIGN_OR_THROW( + decompressed_len, + decompressor_->Decompress( + compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len, + uncompressed_len - levels_byte_len, + decompression_buffer_->mutable_data() + levels_byte_len)); + } + if (decompressed_len != uncompressed_len - levels_byte_len) { throw ParquetException("Page didn't decompress to expected size, expected: " + std::to_string(uncompressed_len - levels_byte_len) + diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index f3d580ab5d345..87514d87db636 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -465,7 +465,7 @@ TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) { int32_t values[batch_size]; int64_t values_read; ASSERT_TRUE(reader->HasNext()); - EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*replevels=*/nullptr, + EXPECT_EQ(4, reader->ReadBatch(batch_size, def_levels_out, /*rep_levels=*/nullptr, values, &values_read)); EXPECT_EQ(3, values_read); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 25446aefd6814..744859cf0f037 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -1774,5 +1774,38 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) { ASSERT_EQ("bar", value); } +TEST_F(TestValuesWriterInt32Type, AllNullsCompressionInPageV2) { + // GH-31992: In DataPageV2, the levels and data will not be compressed together, + // so, when all values are null, the compressed values should be empty. And + // we should handle this case correctly. + std::vector compressions = {Compression::SNAPPY, Compression::GZIP, + Compression::ZSTD, Compression::BROTLI, + Compression::LZ4}; + for (auto compression : compressions) { + if (!Codec::IsAvailable(compression)) { + continue; + } + ARROW_SCOPED_TRACE("compression = ", Codec::GetCodecAsString(compression)); + // Optional and non-repeated, with definition levels + // but no repetition levels + this->SetUpSchema(Repetition::OPTIONAL); + this->GenerateData(SMALL_SIZE); + std::fill(this->def_levels_.begin(), this->def_levels_.end(), 0); + ColumnProperties column_properties; + column_properties.set_compression(compression); + + auto writer = + this->BuildWriter(SMALL_SIZE, column_properties, ParquetVersion::PARQUET_2_LATEST, + ParquetDataPageVersion::V2); + writer->WriteBatch(this->values_.size(), this->def_levels_.data(), nullptr, + this->values_ptr_); + writer->Close(); + + ASSERT_EQ(100, this->metadata_num_values()); + this->ReadColumn(compression); + ASSERT_EQ(0, this->values_read_); + } +} + } // namespace test } // namespace parquet