diff --git a/README.md b/README.md index caad63924..1f77aa376 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ Results for our benchmark derived from TPC-DS are available in the [benchmarking ## Use Commodity Hardware Comet leverages commodity hardware, eliminating the need for costly hardware upgrades or -specialized hardware accelerators, such as GPUs or FGPA. By maximizing the utilization of commodity hardware, Comet +specialized hardware accelerators, such as GPUs or FPGA. By maximizing the utilization of commodity hardware, Comet ensures cost-effectiveness and scalability for your Spark deployments. ## Spark Compatibility diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index e68a49229..860fe4901 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -40,6 +40,10 @@ import org.apache.comet.vector.NativeUtil * The input iterators producing sequence of batches of Arrow Arrays. * @param protobufQueryPlan * The serialized bytes of Spark execution plan. + * @param numParts + * The number of partitions. + * @param partitionIndex + * The index of the partition. */ class CometExecIterator( val id: Long, @@ -47,7 +51,9 @@ class CometExecIterator( numOutputCols: Int, protobufQueryPlan: Array[Byte], arrowFfiMetric: Option[SQLMetric], - nativeMetrics: CometMetricNode) + nativeMetrics: CometMetricNode, + numParts: Int, + partitionIndex: Int) extends Iterator[ColumnarBatch] { private val nativeLib = new Native() @@ -94,12 +100,14 @@ class CometExecIterator( } def getNextBatch(): Option[ColumnarBatch] = { + assert(partitionIndex >= 0 && partitionIndex < numParts) + nativeUtil.getNextBatch( numOutputCols, arrowFfiMetric, (arrayAddrs, schemaAddrs) => { val ctx = TaskContext.get() - nativeLib.executePlan(ctx.stageId(), ctx.partitionId(), plan, arrayAddrs, schemaAddrs) + nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs) }) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 8cc03856c..9698dc98b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -51,9 +51,10 @@ object CometExecUtils { childPlan: RDD[ColumnarBatch], outputAttribute: Seq[Attribute], limit: Int): RDD[ColumnarBatch] = { - childPlan.mapPartitionsInternal { iter => + val numParts = childPlan.getNumPartitions + childPlan.mapPartitionsWithIndexInternal { case (idx, iter) => val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit).get - CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp) + CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp, numParts, idx) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index cc614e23d..92d996c32 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -80,12 +80,13 @@ case class CometTakeOrderedAndProjectExec( val localTopK = if (orderingSatisfies) { CometExecUtils.getNativeLimitRDD(childRDD, child.output, limit) } else { - childRDD.mapPartitionsInternal { iter => + val numParts = childRDD.getNumPartitions + childRDD.mapPartitionsWithIndexInternal { case (idx, iter) => val topK = CometExecUtils .getTopKNativePlan(child.output, sortOrder, child, limit) .get - CometExec.getCometIterator(Seq(iter), child.output.length, topK) + CometExec.getCometIterator(Seq(iter), child.output.length, topK, numParts, idx) } } @@ -105,7 +106,7 @@ case class CometTakeOrderedAndProjectExec( val topKAndProjection = CometExecUtils .getProjectionNativePlan(projectList, child.output, sortOrder, child, limit) .get - val it = CometExec.getCometIterator(Seq(iter), output.length, topKAndProjection) + val it = CometExec.getCometIterator(Seq(iter), output.length, topKAndProjection, 1, 0) setSubqueries(it.id, this) Option(TaskContext.get()).foreach { context => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala index 6db8c67d5..fdf8bf393 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala @@ -31,16 +31,20 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ private[spark] class ZippedPartitionsRDD( sc: SparkContext, - var f: (Seq[Iterator[ColumnarBatch]]) => Iterator[ColumnarBatch], + var f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch], var zipRdds: Seq[RDD[ColumnarBatch]], preservesPartitioning: Boolean = false) extends ZippedPartitionsBaseRDD[ColumnarBatch](sc, zipRdds, preservesPartitioning) { + // We need to get the number of partitions in `compute` but `getNumPartitions` is not available + // on the executors. So we need to capture it here. + private val numParts: Int = this.getNumPartitions + override def compute(s: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions val iterators = zipRdds.zipWithIndex.map(pair => pair._1.iterator(partitions(pair._2), context)) - f(iterators) + f(iterators, numParts, s.index) } override def clearDependencies(): Unit = { @@ -52,7 +56,8 @@ private[spark] class ZippedPartitionsRDD( object ZippedPartitionsRDD { def apply(sc: SparkContext, rdds: Seq[RDD[ColumnarBatch]])( - f: (Seq[Iterator[ColumnarBatch]]) => Iterator[ColumnarBatch]): RDD[ColumnarBatch] = + f: (Seq[Iterator[ColumnarBatch]], Int, Int) => Iterator[ColumnarBatch]) + : RDD[ColumnarBatch] = withScope(sc) { new ZippedPartitionsRDD(sc, f, rdds) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index af4202328..bcb8d094b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -227,13 +227,14 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { outputPartitioning: Partitioning, serializer: Serializer, metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { + val numParts = rdd.getNumPartitions val dependency = new CometShuffleDependency[Int, ColumnarBatch, ColumnarBatch]( rdd.map( (0, _) ), // adding fake partitionId that is always 0 because ShuffleDependency requires it serializer = serializer, shuffleWriterProcessor = - new CometShuffleWriteProcessor(outputPartitioning, outputAttributes, metrics), + new CometShuffleWriteProcessor(outputPartitioning, outputAttributes, metrics, numParts), shuffleType = CometNativeShuffle, partitioner = new Partitioner { override def numPartitions: Int = outputPartitioning.numPartitions @@ -449,7 +450,8 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { class CometShuffleWriteProcessor( outputPartitioning: Partitioning, outputAttributes: Seq[Attribute], - metrics: Map[String, SQLMetric]) + metrics: Map[String, SQLMetric], + numParts: Int) extends ShimCometShuffleWriteProcessor { private val OFFSET_LENGTH = 8 @@ -493,7 +495,9 @@ class CometShuffleWriteProcessor( outputAttributes.length, nativePlan, metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), - CometMetricNode(nativeSQLMetrics)) + CometMetricNode(nativeSQLMetrics), + numParts, + context.partitionId()) while (cometIter.hasNext) { cometIter.next() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index eb754f5e6..d2003b52a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -120,8 +120,17 @@ object CometExec { def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, - nativePlan: Operator): CometExecIterator = { - getCometIterator(inputs, numOutputCols, nativePlan, None, CometMetricNode(Map.empty)) + nativePlan: Operator, + numParts: Int, + partitionIdx: Int): CometExecIterator = { + getCometIterator( + inputs, + numOutputCols, + nativePlan, + None, + CometMetricNode(Map.empty), + numParts, + partitionIdx) } def getCometIterator( @@ -129,12 +138,22 @@ object CometExec { numOutputCols: Int, nativePlan: Operator, arrowFfiMetric: Option[SQLMetric], - nativeMetrics: CometMetricNode): CometExecIterator = { + nativeMetrics: CometMetricNode, + numParts: Int, + partitionIdx: Int): CometExecIterator = { val outputStream = new ByteArrayOutputStream() nativePlan.writeTo(outputStream) outputStream.close() val bytes = outputStream.toByteArray - new CometExecIterator(newIterId, inputs, numOutputCols, bytes, arrowFfiMetric, nativeMetrics) + new CometExecIterator( + newIterId, + inputs, + numOutputCols, + bytes, + arrowFfiMetric, + nativeMetrics, + numParts, + partitionIdx) } /** @@ -215,14 +234,19 @@ abstract class CometNativeExec extends CometExec { // TODO: support native metrics for all operators. val nativeMetrics = CometMetricNode.fromCometPlan(this) - def createCometExecIter(inputs: Seq[Iterator[ColumnarBatch]]): CometExecIterator = { + def createCometExecIter( + inputs: Seq[Iterator[ColumnarBatch]], + numParts: Int, + partitionIndex: Int): CometExecIterator = { val it = new CometExecIterator( CometExec.newIterId, inputs, output.length, serializedPlanCopy, metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), - nativeMetrics) + nativeMetrics, + numParts, + partitionIndex) setSubqueries(it.id, this) @@ -297,7 +321,7 @@ abstract class CometNativeExec extends CometExec { throw new CometRuntimeException(s"No input for CometNativeExec:\n $this") } - ZippedPartitionsRDD(sparkContext, inputs.toSeq)(createCometExecIter(_)) + ZippedPartitionsRDD(sparkContext, inputs.toSeq)(createCometExecIter) } } diff --git a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala index ef0485dfe..6ca38e831 100644 --- a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala @@ -37,7 +37,9 @@ class CometNativeSuite extends CometTestBase { override def next(): ColumnarBatch = throw new NullPointerException() }), 1, - limitOp) + limitOp, + 1, + 0) cometIter.next() cometIter.close() value