From adb8368985ad20e1872a50939c43488516d18b55 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 19 Aug 2024 11:13:11 +0800 Subject: [PATCH] [FLINK-36084] Optimize parquet binary getBytes with getBytesUnsafe to avoid copy cost (#25213) --- .../apache/flink/formats/parquet/vector/ParquetDictionary.java | 2 +- .../flink/formats/parquet/vector/reader/BytesColumnReader.java | 2 +- .../parquet/vector/reader/FixedLenBytesColumnReader.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java index f36c7ff4bfa69..739e5a5fde77b 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java @@ -62,7 +62,7 @@ public double decodeToDouble(int id) { @Override public byte[] decodeToBinary(int id) { - return dictionary.decodeToBinary(id).getBytes(); + return dictionary.decodeToBinary(id).getBytesUnsafe(); } @Override diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java index 2cbdbdaa3ce92..92e3ba6f4c627 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java @@ -74,7 +74,7 @@ protected void readBatchFromDictionaryIds( int rowId, int num, WritableBytesVector column, WritableIntVector dictionaryIds) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytes(); + byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe(); column.appendBytes(i, bytes, 0, bytes.length); } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java index e82f68a17ce98..19f460cf23f9a 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java @@ -101,7 +101,7 @@ protected void readBatchFromDictionaryIds( WritableBytesVector bytesVector = (WritableBytesVector) column; for (int i = rowId; i < rowId + num; ++i) { if (!bytesVector.isNullAt(i)) { - byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytes(); + byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytesUnsafe(); bytesVector.appendBytes(i, v, 0, v.length); } }