Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 29, 2024
1 parent eb53b20 commit bbb3714
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void close() throws IOException {

@SuppressWarnings("deprecation")
private boolean loadNextRowGroupIfNecessary() throws Throwable {
System.out.println("loadNextRowGroupIfNecessary");
// More rows can be read from loaded row group. No need to load next one.
if (rowsRead != totalRowsLoaded) return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import scala.collection.JavaConverters;

import org.junit.Test;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FixedSizeBinaryVector;
Expand Down Expand Up @@ -90,7 +88,6 @@ public class TestColumnReader {
(v, i) -> v.getDecimal(i, 18, 10),
(v, i) -> v.getDecimal(i, 19, 5));

@Test
public void testConstantVectors() {
for (int i = 0; i < TYPES.size(); i++) {
DataType type = TYPES.get(i);
Expand Down Expand Up @@ -138,7 +135,6 @@ public void testConstantVectors() {
}
}

@Test
public void testRowIndexColumnVectors() {
StructField field = StructField.apply("f", LongType, false, null);
int bigBatchSize = BATCH_SIZE * 2;
Expand Down Expand Up @@ -174,7 +170,6 @@ public void testRowIndexColumnVectors() {
reader.close();
}

@Test
public void testIsFixedLength() {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);

Expand Down
6 changes: 5 additions & 1 deletion native/core/src/common/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl CometBuffer {
/// operators, they are responsible for copying content out of the buffers.
pub unsafe fn to_arrow(&self) -> Result<ArrowBuffer, ExecutionError> {
let ptr = NonNull::new_unchecked(self.data.as_ptr());
Ok(ArrowBuffer::from_custom_allocation(ptr, self.len, Arc::new(CometBufferAllocation::new())))
Ok(ArrowBuffer::from_custom_allocation(ptr, self.len, self.allocation.clone()))
/*
if Arc::strong_count(&self.allocation) > 1 {
Err(ExecutionError::GeneralError(
Expand All @@ -183,6 +183,10 @@ impl CometBuffer {
*/
}

pub fn check(&self) {
println!("buffer strong count: {}", Arc::strong_count(&self.allocation));
}

/// Resets this buffer by filling all bytes with zeros.
pub fn reset(&mut self) {
debug_assert!(self.owned, "cannot modify un-owned buffer");
Expand Down
4 changes: 2 additions & 2 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ fn copy_or_unpack_array(array: &Arc<dyn Array>, mode: &CopyMode) -> Result<Array
let options = CastOptions::default();
// We need to copy the array after `cast` because arrow-rs `take` kernel which is used
// to unpack dictionary array might reuse the input array's null buffer.
Ok(copy_array(&cast_with_options(
cast_with_options(
array,
value_type.as_ref(),
&options,
)?))
)
}
_ => {
if mode == &CopyMode::UnpackOrDeepCopy {
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl SparkArrowConvert for ArrayData {
/// Returned pointers are Arc-ed and should be freed manually.
#[allow(clippy::arc_with_non_send_sync)]
fn to_spark(&self) -> Result<(i64, i64), ExecutionError> {
println!("to_spark");
let arrow_array = Arc::new(FFI_ArrowArray::new(self));
let arrow_schema = Arc::new(FFI_ArrowSchema::try_from(self.data_type())?);

Expand Down
24 changes: 23 additions & 1 deletion native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,13 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_resetBatch(
handle: jlong,
) {
try_unwrap_or_throw(&env, |_| {
let ctx = get_context(handle)?;
println!("resetBatch: {}, arrays is some: {}", handle, ctx.arrays.is_some());

ctx.arrays.iter().for_each(|(array, schema)| {
println!("resetBatch. array is_released: {}", array.is_released());
});

let reader = get_reader(handle)?;
reader.reset_batch();
Ok(())
Expand Down Expand Up @@ -541,10 +548,13 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch(
handle: jlong,
) -> jlongArray {
try_unwrap_or_throw(&e, |env| {
println!("currentBatch");
let ctx = get_context(handle)?;
println!("currentBatch: {}. arrays: {}", handle, ctx.arrays.is_some());

// ctx.arrays = None;
ctx.arrays.iter().for_each(|(array, schema)| {
println!("array is_released: {}", array.is_released());
});

let reader = &mut ctx.column_reader;
let data = reader.current_batch()?;
Expand All @@ -553,6 +563,7 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch(
unsafe {
let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray);
let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema);
println!("overwrite arrays");
ctx.arrays = Some((arrow_array, arrow_schema));

let res = env.new_long_array(2)?;
Expand Down Expand Up @@ -587,7 +598,18 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_closeColumnReader(
) {
try_unwrap_or_throw(&env, |_| {
unsafe {
println!("closeColumnReader: {}", handle);
let ctx = get_context(handle)?;

ctx.arrays.iter().for_each(|(array, schema)| {
println!("closeColumnReader. array is_released: {}", array.is_released());
});

let reader = &mut ctx.column_reader;
reader.check();

let ctx = handle as *mut Context;

let _ = Box::from_raw(ctx);
};
Ok(())
Expand Down
12 changes: 10 additions & 2 deletions native/core/src/parquet/mutable_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl ParquetMutableVector {
} else {
self.arrow_type.clone()
};
println!("get_array_data: data_type: {:?}, num_nulls: {}", data_type, self.num_nulls);
let mut builder = ArrayData::builder(data_type)
.len(self.num_values)
.add_buffer(self.value_buffer.to_arrow()?)
Expand All @@ -217,8 +218,10 @@ impl ParquetMutableVector {
if let Some(d) = &mut self.dictionary {
builder = builder.add_child_data(d.get_array_data()?);
}

Ok(builder.build_unchecked())
println!("before building array data");
let d = Ok(builder.build_unchecked());
println!("after building array data");
d
}
}

Expand Down Expand Up @@ -249,4 +252,9 @@ impl ParquetMutableVector {
// - Decimal may pad buffer with 0xff so we need to clear them before a new batch
matches!(dt, ArrowDataType::Boolean | ArrowDataType::Decimal128(_, _))
}

pub fn check(&self) -> bool {
self.validity_buffer.check();
true
}
}
8 changes: 8 additions & 0 deletions native/core/src/parquet/read/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ impl ColumnReader {
pub fn skip_batch(&mut self, total: usize, put_nulls: bool) -> usize {
make_func_mut!(self, skip_batch, total, put_nulls)
}

pub fn check(&self) -> bool {
make_func!(self, check)
}
}

/// A batched reader for a primitive Parquet column.
Expand Down Expand Up @@ -954,6 +958,10 @@ impl<T: DataType> TypedColumnReader<T> {
self.vector.num_values += len;
}

pub fn check(&self) -> bool {
self.vector.check()
}

/// Check a few pre-conditions for setting constants, as well as setting
/// that `is_const` to true for the particular column reader.
fn check_const(&mut self, method_name: &str) {
Expand Down

0 comments on commit bbb3714

Please sign in to comment.