Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Nov 14, 2024
1 parent 786250a commit b3e03ae
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ class CometSparkSessionExtensions
if CometNativeScanExec.isSchemaSupported(requiredSchema)
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
logInfo("Comet extension enabled for v1 Scan")
CometNativeScanExec(scanExec, session)
logInfo("Comet extension enabled for v1 full native Scan")
CometScanExec(scanExec, session)

// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
Expand Down Expand Up @@ -365,10 +366,18 @@ class CometSparkSessionExtensions
}

plan.transformUp {
case op if isCometScan(op) =>
// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) && !COMET_FULL_NATIVE_SCAN_ENABLED.get =>
val nativeOp = QueryPlanSerde.operator2Proto(op).get
CometScanWrapper(nativeOp, op)

// Fully native scan for V1
case scan: CometScanExec if COMET_FULL_NATIVE_SCAN_ENABLED.get =>
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
// scalastyle:off println
println(s"Comet full native scan: $nativeOp")
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)

case op if shouldApplySparkToColumnar(conf, op) =>
val cometOp = CometSparkToColumnarExec(op)
val nativeOp = QueryPlanSerde.operator2Proto(cometOp).get
Expand Down Expand Up @@ -965,6 +974,9 @@ class CometSparkSessionExtensions

var newPlan = transform(normalizedPlan)

// scalastyle:off println
println(s"newPlan: $newPlan")

// if the plan cannot be run fully natively then explain why (when appropriate
// config is enabled)
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
Expand Down Expand Up @@ -1221,8 +1233,7 @@ object CometSparkSessionExtensions extends Logging {
}

def isCometScan(op: SparkPlan): Boolean = {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
op.isInstanceOf[CometNativeScanExec]
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, Normalize
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometNativeScanExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec, DecimalPrecision}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -2480,7 +2480,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
childOp.foreach(result.addChildren)

op match {
case scan: CometNativeScanExec =>

// Fully native scan for V1
case scan: CometScanExec if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.get =>
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
nativeScanBuilder.setSource(op.simpleStringWithNodeId())

Expand Down
Loading

0 comments on commit b3e03ae

Please sign in to comment.