Skip to content

Commit

Permalink
[VL] Fix Arrow ColumnarBatch cannnot revoke rowIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Aug 14, 2024
1 parent db799a4 commit 2012a33
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testOffloadAndLoad() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 100;
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
final ColumnarBatch batch = newArrowBatch(numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
Expand All @@ -52,6 +52,7 @@ public void testOffloadAndLoad() {
false)
.count();
Assert.assertEquals(numRows, cnt);
Assert.assertEquals(loaded.getRow(0).getInt(1), 15);
loaded.close();
return null;
});
Expand Down Expand Up @@ -101,4 +102,23 @@ private static ColumnarBatch newArrowBatch(String schema, int numRows) {
batch.setNumRows(numRows);
return batch;
}

private static ColumnarBatch newArrowBatch(int numRows) {
String schema = "a boolean, b int";
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows, StructType.fromDDL(schema));
ArrowWritableColumnVector col1 = columns[0];
ArrowWritableColumnVector col2 = columns[1];
for (int j = 0; j < numRows; j++) {
col1.putBoolean(j, j % 2 == 0);
col2.putInt(j, 15 - j);
}
col2.putNull(numRows - 1);
for (ArrowWritableColumnVector col : columns) {
col.setValueCount(numRows);
}
final ColumnarBatch batch = new ColumnarBatch(columns);
batch.setNumRows(numRows);
return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
import org.apache.gluten.utils.ImplicitClass;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -186,23 +185,14 @@ private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input
ColumnarBatch output =
ArrowAbiUtil.importToSparkColumnarBatch(allocator, arrowSchema, cArray);

// Follow gluten input's reference count. This might be optimized using
// automatic clean-up or once the extensibility of ColumnarBatch is enriched
// Loaded Arrow ColumnarBatch lifecycle is controlled by the caller. The GC can help clean it.
IndicatorVector giv = (IndicatorVector) input.column(0);
ImplicitClass.ArrowColumnarBatchRetainer retainer =
new ImplicitClass.ArrowColumnarBatchRetainer(output);
for (long i = 0; i < (giv.refCnt() - 1); i++) {
retainer.retain();
}

// close the input one
for (long i = 0; i < giv.refCnt(); i++) {
input.close();
}

// populate new vectors to input
transferVectors(output, input);
return input;
return output;
}
}

Expand Down

0 comments on commit 2012a33

Please sign in to comment.