diff --git a/native/Cargo.toml b/native/Cargo.toml index 4b89231c7..921e9950a 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -63,4 +63,4 @@ debug = true overflow-checks = false lto = "thin" codegen-units = 1 -strip = "debuginfo" +# strip = "debuginfo" diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index babc68696..9de0ef33a 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1010,6 +1010,7 @@ impl PhysicalPlanner { // Generate file groups let mut file_groups: Vec> = Vec::with_capacity(partition_count); + println!("scan.file_partitions: {}", scan.file_partitions.len()); scan.file_partitions.iter().try_for_each(|partition| { let mut files = Vec::with_capacity(partition.partitioned_file.len()); partition.partitioned_file.iter().try_for_each(|file| { diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index d88f129a3..f467f6f20 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -1029,13 +1029,25 @@ class CometSparkSessionExtensions var firstNativeOp = true newPlan.transformDown { case op: CometNativeExec => - if (firstNativeOp) { + val newPlan = if (firstNativeOp) { + // scalastyle:off println + println(s"first native op: ${op.nodeName}") firstNativeOp = false op.convertBlock() } else { + // scalastyle:off println + println(s"next native op: ${op.nodeName}") op } + + if (op.children.isEmpty) { + firstNativeOp = true + } + + newPlan case op => + // scalastyle:off println + println(s"other op: ${op.nodeName}") firstNativeOp = true op } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7b978c986..6dec381e5 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2504,8 +2504,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scan.inputRDD match { case rdd: DataSourceRDD => val partitions = rdd.partitions + // scalastyle:off println + println(s"partitions: ${partitions.length}") + var numPart = 0 partitions.foreach(p => { val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions + numPart = numPart + inputPartitions.length inputPartitions.foreach(partition => { partition2Proto( partition.asInstanceOf[FilePartition], @@ -2513,7 +2517,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scan.relation.partitionSchema) }) }) + println(s"inputPartitions: ${numPart}") case rdd: FileScanRDD => + // scalastyle:off println + println(s"partitions: ${rdd.filePartitions.length}") rdd.filePartitions.foreach(partition => { partition2Proto(partition, nativeScanBuilder, scan.relation.partitionSchema) }) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a54b70ea4..49848e5f4 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -64,6 +64,21 @@ class CometExecSuite extends CometTestBase { } } + test("limit 2 (cartesian product)") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") { + val df = sql("SELECT tbl_a._1, tbl_b._2 FROM tbl_a JOIN tbl_b LIMIT 2") + df.explain() + checkSparkAnswerAndOperator( + df, + classOf[CollectLimitExec], + classOf[CartesianProductExec]) + } + } + } + } + test("TopK operator should return correct results on dictionary column with nulls") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { withTable("test_data") { @@ -1041,6 +1056,7 @@ class CometExecSuite extends CometTestBase { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") { val df = sql("SELECT tbl_a._1, tbl_b._2 FROM tbl_a JOIN tbl_b LIMIT 2") + df.explain() checkSparkAnswerAndOperator( df, classOf[CollectLimitExec],