Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Nov 20, 2024
1 parent a68ac54 commit b124a4f
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 2 deletions.
2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ debug = true
overflow-checks = false
lto = "thin"
codegen-units = 1
strip = "debuginfo"
# strip = "debuginfo"
1 change: 1 addition & 0 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ impl PhysicalPlanner {
// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
Vec::with_capacity(partition_count);
println!("scan.file_partitions: {}", scan.file_partitions.len());
scan.file_partitions.iter().try_for_each(|partition| {
let mut files = Vec::with_capacity(partition.partitioned_file.len());
partition.partitioned_file.iter().try_for_each(|file| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,13 +1029,25 @@ class CometSparkSessionExtensions
var firstNativeOp = true
newPlan.transformDown {
case op: CometNativeExec =>
if (firstNativeOp) {
val newPlan = if (firstNativeOp) {
// scalastyle:off println
println(s"first native op: ${op.nodeName}")
firstNativeOp = false
op.convertBlock()
} else {
// scalastyle:off println
println(s"next native op: ${op.nodeName}")
op
}

if (op.children.isEmpty) {
firstNativeOp = true
}

newPlan
case op =>
// scalastyle:off println
println(s"other op: ${op.nodeName}")
firstNativeOp = true
op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2504,16 +2504,23 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
scan.inputRDD match {
case rdd: DataSourceRDD =>
val partitions = rdd.partitions
// scalastyle:off println
println(s"partitions: ${partitions.length}")
var numPart = 0
partitions.foreach(p => {
val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions
numPart = numPart + inputPartitions.length
inputPartitions.foreach(partition => {
partition2Proto(
partition.asInstanceOf[FilePartition],
nativeScanBuilder,
scan.relation.partitionSchema)
})
})
println(s"inputPartitions: ${numPart}")
case rdd: FileScanRDD =>
// scalastyle:off println
println(s"partitions: ${rdd.filePartitions.length}")
rdd.filePartitions.foreach(partition => {
partition2Proto(partition, nativeScanBuilder, scan.relation.partitionSchema)
})
Expand Down
16 changes: 16 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ class CometExecSuite extends CometTestBase {
}
}

test("limit 2 (cartesian product)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
val df = sql("SELECT tbl_a._1, tbl_b._2 FROM tbl_a JOIN tbl_b LIMIT 2")
df.explain()
checkSparkAnswerAndOperator(
df,
classOf[CollectLimitExec],
classOf[CartesianProductExec])
}
}
}
}

test("TopK operator should return correct results on dictionary column with nulls") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTable("test_data") {
Expand Down Expand Up @@ -1041,6 +1056,7 @@ class CometExecSuite extends CometTestBase {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
val df = sql("SELECT tbl_a._1, tbl_b._2 FROM tbl_a JOIN tbl_b LIMIT 2")
df.explain()
checkSparkAnswerAndOperator(
df,
classOf[CollectLimitExec],
Expand Down

0 comments on commit b124a4f

Please sign in to comment.