diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d8e75c67f..8eeda8a5a 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -262,7 +262,13 @@ fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { let options = CastOptions::default(); - cast_with_options(array, value_type.as_ref(), &options) + // We need to copy the array after `cast` because arrow-rs `take` kernel which is used + // to unpack dictionary array might reuse the input array's null buffer. + Ok(copy_array(&cast_with_options( + array, + value_type.as_ref(), + &options, + )?)) } _ => { if mode == &CopyMode::UnpackOrDeepCopy { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 78f59cbea..a720842ce 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -64,6 +64,25 @@ class CometExecSuite extends CometTestBase { } } + test("TopK operator should return correct results on dictionary column with nulls") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTable("test_data") { + val tableDF = spark.sparkContext + .parallelize(Seq((1, null, "A"), (2, "BBB", "B"), (3, "BBB", "B"), (4, "BBB", "B")), 3) + .toDF("c1", "c2", "c3") + tableDF + .coalesce(1) + .sortWithinPartitions("c1") + .writeTo("test_data") + .using("parquet") + .create() + + val df = sql("SELECT * FROM test_data ORDER BY c1 LIMIT 3") + checkSparkAnswer(df) + } + } + } + test("DPP fallback") { withTempDir { path => // create test data