diff --git a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala index 0e01c9d5d82fa..e271fbe4cd99a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala @@ -334,17 +334,16 @@ case class ColumnarArrowEvalPythonExec( val inputBatchIter = contextAwareIterator.map { inputCb => start_time = System.nanoTime() - ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb) - ColumnarBatches.retain(inputCb) + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb) // 0. cache input for later merge - inputCbCache += inputCb - numInputRows += inputCb.numRows + inputCbCache += loadedCb + numInputRows += loadedCb.numRows // We only need to pass the referred cols data to python worker for evaluation. var colsForEval = new ArrayBuffer[ColumnVector]() for (i <- originalOffsets) { - colsForEval += inputCb.column(i) + colsForEval += loadedCb.column(i) } - new ColumnarBatch(colsForEval.toArray, inputCb.numRows()) + new ColumnarBatch(colsForEval.toArray, loadedCb.numRows()) } val outputColumnarBatchIterator =