Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 8, 2024
1 parent a7e95b4 commit bbd6b3f
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -157,7 +157,9 @@ case class CometColumnarToRowExec(child: SparkPlan)
}

val writableColumnVectorClz = classOf[WritableColumnVector].getName
val constantColumnVectorClz = classOf[ConstantColumnVector].getName

// scalastyle:off line.size.limit
s"""
|if ($batch == null) {
| $nextBatchFuncName();
Expand All @@ -174,7 +176,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
|
| // Comet fix for SPARK-50235
| for (int i = 0; i < ${colVars.length}; i++) {
| if (!($batch.column(i) instanceof $writableColumnVectorClz)) {
| if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz)) {
| $batch.column(i).close();
| }
| }
Expand All @@ -187,6 +189,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
| $batch.close();
|}
""".stripMargin
// scalastyle:on line.size.limit
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
Expand Down

0 comments on commit bbd6b3f

Please sign in to comment.