diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 11c6d14dc..9780217fe 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -534,6 +534,7 @@ public void close() throws IOException { @SuppressWarnings("deprecation") private boolean loadNextRowGroupIfNecessary() throws Throwable { + System.out.println("loadNextRowGroupIfNecessary"); // More rows can be read from loaded row group. No need to load next one. if (rowsRead != totalRowsLoaded) return true; diff --git a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java index d4e748a9b..6118025c6 100644 --- a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java +++ b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java @@ -28,8 +28,6 @@ import scala.collection.JavaConverters; -import org.junit.Test; - import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FixedSizeBinaryVector; @@ -90,7 +88,6 @@ public class TestColumnReader { (v, i) -> v.getDecimal(i, 18, 10), (v, i) -> v.getDecimal(i, 19, 5)); - @Test public void testConstantVectors() { for (int i = 0; i < TYPES.size(); i++) { DataType type = TYPES.get(i); @@ -138,7 +135,6 @@ public void testConstantVectors() { } } - @Test public void testRowIndexColumnVectors() { StructField field = StructField.apply("f", LongType, false, null); int bigBatchSize = BATCH_SIZE * 2; @@ -174,7 +170,6 @@ public void testRowIndexColumnVectors() { reader.close(); } - @Test public void testIsFixedLength() { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); diff --git a/native/core/src/common/buffer.rs b/native/core/src/common/buffer.rs index d8398fd17..0c6f362ed 100644 --- a/native/core/src/common/buffer.rs +++ b/native/core/src/common/buffer.rs @@ -170,7 +170,7 @@ impl CometBuffer { /// operators, they are responsible for copying content out of the buffers. pub unsafe fn to_arrow(&self) -> Result { let ptr = NonNull::new_unchecked(self.data.as_ptr()); - Ok(ArrowBuffer::from_custom_allocation(ptr, self.len, Arc::new(CometBufferAllocation::new()))) + Ok(ArrowBuffer::from_custom_allocation(ptr, self.len, self.allocation.clone())) /* if Arc::strong_count(&self.allocation) > 1 { Err(ExecutionError::GeneralError( @@ -183,6 +183,10 @@ impl CometBuffer { */ } + pub fn check(&self) { + println!("buffer strong count: {}", Arc::strong_count(&self.allocation)); + } + /// Resets this buffer by filling all bytes with zeros. pub fn reset(&mut self) { debug_assert!(self.owned, "cannot modify un-owned buffer"); diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 8eeda8a5a..7e62183ea 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -264,11 +264,11 @@ fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { if mode == &CopyMode::UnpackOrDeepCopy { diff --git a/native/core/src/execution/utils.rs b/native/core/src/execution/utils.rs index 553d42606..c5f35adcf 100644 --- a/native/core/src/execution/utils.rs +++ b/native/core/src/execution/utils.rs @@ -92,6 +92,7 @@ impl SparkArrowConvert for ArrayData { /// Returned pointers are Arc-ed and should be freed manually. #[allow(clippy::arc_with_non_send_sync)] fn to_spark(&self) -> Result<(i64, i64), ExecutionError> { + println!("to_spark"); let arrow_array = Arc::new(FFI_ArrowArray::new(self)); let arrow_schema = Arc::new(FFI_ArrowSchema::try_from(self.data_type())?); diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index f549c439f..7b39e0b25 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -495,6 +495,13 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_resetBatch( handle: jlong, ) { try_unwrap_or_throw(&env, |_| { + let ctx = get_context(handle)?; + println!("resetBatch: {}, arrays is some: {}", handle, ctx.arrays.is_some()); + + ctx.arrays.iter().for_each(|(array, schema)| { + println!("resetBatch. array is_released: {}", array.is_released()); + }); + let reader = get_reader(handle)?; reader.reset_batch(); Ok(()) @@ -541,10 +548,13 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch( handle: jlong, ) -> jlongArray { try_unwrap_or_throw(&e, |env| { - println!("currentBatch"); let ctx = get_context(handle)?; + println!("currentBatch: {}. arrays: {}", handle, ctx.arrays.is_some()); // ctx.arrays = None; + ctx.arrays.iter().for_each(|(array, schema)| { + println!("array is_released: {}", array.is_released()); + }); let reader = &mut ctx.column_reader; let data = reader.current_batch()?; @@ -553,6 +563,7 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_currentBatch( unsafe { let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray); let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema); + println!("overwrite arrays"); ctx.arrays = Some((arrow_array, arrow_schema)); let res = env.new_long_array(2)?; @@ -587,7 +598,18 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_closeColumnReader( ) { try_unwrap_or_throw(&env, |_| { unsafe { + println!("closeColumnReader: {}", handle); + let ctx = get_context(handle)?; + + ctx.arrays.iter().for_each(|(array, schema)| { + println!("closeColumnReader. array is_released: {}", array.is_released()); + }); + + let reader = &mut ctx.column_reader; + reader.check(); + let ctx = handle as *mut Context; + let _ = Box::from_raw(ctx); }; Ok(()) diff --git a/native/core/src/parquet/mutable_vector.rs b/native/core/src/parquet/mutable_vector.rs index b67a590ae..eea8994c3 100644 --- a/native/core/src/parquet/mutable_vector.rs +++ b/native/core/src/parquet/mutable_vector.rs @@ -203,6 +203,7 @@ impl ParquetMutableVector { } else { self.arrow_type.clone() }; + println!("get_array_data: data_type: {:?}, num_nulls: {}", data_type, self.num_nulls); let mut builder = ArrayData::builder(data_type) .len(self.num_values) .add_buffer(self.value_buffer.to_arrow()?) @@ -217,8 +218,10 @@ impl ParquetMutableVector { if let Some(d) = &mut self.dictionary { builder = builder.add_child_data(d.get_array_data()?); } - - Ok(builder.build_unchecked()) + println!("before building array data"); + let d = Ok(builder.build_unchecked()); + println!("after building array data"); + d } } @@ -249,4 +252,9 @@ impl ParquetMutableVector { // - Decimal may pad buffer with 0xff so we need to clear them before a new batch matches!(dt, ArrowDataType::Boolean | ArrowDataType::Decimal128(_, _)) } + + pub fn check(&self) -> bool { + self.validity_buffer.check(); + true + } } diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index 3dc19db62..73d61e763 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -615,6 +615,10 @@ impl ColumnReader { pub fn skip_batch(&mut self, total: usize, put_nulls: bool) -> usize { make_func_mut!(self, skip_batch, total, put_nulls) } + + pub fn check(&self) -> bool { + make_func!(self, check) + } } /// A batched reader for a primitive Parquet column. @@ -954,6 +958,10 @@ impl TypedColumnReader { self.vector.num_values += len; } + pub fn check(&self) -> bool { + self.vector.check() + } + /// Check a few pre-conditions for setting constants, as well as setting /// that `is_const` to true for the particular column reader. fn check_const(&mut self, method_name: &str) {