Skip to content

Commit

Permalink
chore: Refactor Arrow Array and Schema allocation in ColumnReader
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 31, 2024
1 parent cb3e977 commit 94ef085
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 63 deletions.
83 changes: 46 additions & 37 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand All @@ -50,6 +52,7 @@

public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected final BufferAllocator ALLOCATOR = new RootAllocator();

/**
* The current Comet vector holding all the values read by this column reader. Owned by this
Expand Down Expand Up @@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader {

private final CometSchemaImporter importer;

private ArrowArray array = null;
private ArrowSchema schema = null;

public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -201,53 +207,56 @@ public CometDecodedVector loadVector() {
boolean isUuid =
logicalTypeAnnotation instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;

long[] addresses = Native.currentBatch(nativeHandle);
array = ArrowArray.allocateNew(ALLOCATOR);
schema = ArrowSchema.allocateNew(ALLOCATOR);

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector = importer.importVector(array, schema);
long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
FieldVector vector = importer.importVector(array, schema);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();
DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

if (dictionaryEncoding == null) {
if (dictionary != null) {
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
hadNull = cometVector.hasNull();

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionaryEncoding == null) {
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
// This means the column was using dictionary encoding but now has fall-back to plain
// encoding, on the native side. Setting 'dictionary' to null here, so we can use it as
// a condition to check if we can re-use vector later.
dictionary = null;
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
// Either the column is not dictionary encoded, or it was using dictionary encoding but
// a new data page has switched back to use plain encoding. For both cases we should
// return plain vector.
currentVector = cometVector;
return currentVector;
}

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
return currentVector;
}

protected void readPage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();

private CometVector vector;

private ArrowArray array = null;
private ArrowSchema schema = null;

public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, boolean useDecimal128) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);
Expand All @@ -50,13 +54,17 @@ public void setBatchSize(int batchSize) {
@Override
public void readBatch(int total) {
if (vector == null) {
long[] addresses = Native.currentBatch(nativeHandle);
try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);

long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
vector = new CometPlainVector(fieldVector, useDecimal128);
}

vector.setNumValues(total);
}

Expand Down
6 changes: 3 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ public static native void setPageV2(
* Returns the current batch constructed via 'readBatch'
*
* @param handle the handle to the native Parquet column reader
* @return a long array with 2 elements, the first is the address to native Arrow array, and the
* second is the address to the Arrow schema.
* @param arrayAddr the memory address to the ArrowArray struct
* @param schemaAddr the memory address to the ArrowSchema struct
*/
public static native long[] currentBatch(long handle);
public static native void currentBatch(long handle, long arrayAddr, long schemaAddr);

/** Set methods to set a constant value for the reader, so it'll return constant vectors */
public static native void setNull(long handle);
Expand Down
24 changes: 7 additions & 17 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc};

use crate::errors::{try_unwrap_or_throw, CometError};

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi::FFI_ArrowArray;

/// JNI exposed methods
use jni::JNIEnv;
Expand All @@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String";
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
pub arrays: Option<(Arc<FFI_ArrowArray>, Arc<FFI_ArrowSchema>)>,
last_data_page: Option<GlobalRef>,
}

Expand Down Expand Up @@ -110,7 +109,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initColumnReader(
use_decimal_128 != 0,
use_legacy_date_timestamp != 0,
),
arrays: None,
last_data_page: None,
};
let res = Box::new(ctx);
Expand Down Expand Up @@ -539,24 +537,16 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
) -> jlongArray {
try_unwrap_or_throw(&e, |env| {
array_addr: jlong,
schema_addr: jlong,
) {
try_unwrap_or_throw(&e, |_env| {
let ctx = get_context(handle)?;
let reader = &mut ctx.column_reader;
let data = reader.current_batch();
let (array, schema) = data.to_spark()?;
data.move_to_spark(array_addr, schema_addr)?;

unsafe {
let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray);
let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema);
ctx.arrays = Some((arrow_array, arrow_schema));

let res = env.new_long_array(2)?;
let buf: [i64; 2] = [array, schema];
env.set_long_array_region(&res, 0, &buf)
.expect("set long array region failed");
Ok(res.into_raw())
}
Ok(())
})
}

Expand Down

0 comments on commit 94ef085

Please sign in to comment.