-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Enable Comet broadcast by default #213
Conversation
case plan | ||
if isCometNative(plan) && | ||
plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) => | ||
val newChildren = plan.children.map { | ||
case b: BroadcastExchangeExec | ||
if isCometNative(b.child) && | ||
isCometOperatorEnabled(conf, "broadcastExchangeExec") => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the common operator enable config to control broadcast operator as other operators.
val operatorDisabledFlag = s"$COMET_EXEC_CONFIG_PREFIX.$operator.disabled" | ||
conf.getConfString(operatorFlag, "false").toBoolean || isCometAllOperatorEnabled(conf) && | ||
!conf.getConfString(operatorDisabledFlag, "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is added to be able disable certain operator specially.
a24c3b5
to
86167fd
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #213 +/- ##
============================================
+ Coverage 33.48% 33.58% +0.09%
- Complexity 776 780 +4
============================================
Files 108 107 -1
Lines 37178 37211 +33
Branches 8146 8160 +14
============================================
+ Hits 12448 12496 +48
+ Misses 22107 22076 -31
- Partials 2623 2639 +16 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
cc @sunchao |
cc @sunchao Please take a look. Thanks. |
d25c21d
to
5fa8781
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (pending CI)
A few tests seem needed to be updated. Let me take a look. |
batches.map { batch => | ||
val codec = CompressionCodec.createCodec(SparkEnv.get.conf) | ||
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate) | ||
val out = new DataOutputStream(codec.compressedOutputStream(cbbos)) | ||
|
||
val count = new NativeUtil().serializeBatches(iter, out) | ||
val (fieldVectors, batchProviderOpt) = nativeUtil.getBatchFieldVectors(batch) | ||
val root = new VectorSchemaRoot(fieldVectors.asJava) | ||
val provider = batchProviderOpt.getOrElse(nativeUtil.getDictionaryProvider) | ||
|
||
val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out)) | ||
writer.start() | ||
writer.writeBatch() | ||
|
||
root.clear() | ||
writer.end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously serializeBatches
is wrong which serializes all batches with a ArrowStreamWriter
. It causes wrong results when serializing dictionary arrays, i.e., #241.
Each batch could have different dictionary provider content. But when ArrowStreamWriter
starts to serialize, it writes out dictionaries at the beginning. So later batch will use incorrect dictionary value.
@@ -191,7 +193,7 @@ case class CometBroadcastExchangeExec(originalPlan: SparkPlan, child: SparkPlan) | |||
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { | |||
val broadcasted = executeBroadcast[Array[ChunkedByteBuffer]]() | |||
|
|||
new CometBatchRDD(sparkContext, broadcasted.value.length, broadcasted) | |||
new CometBatchRDD(sparkContext, childRDD.getNumPartitions, broadcasted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broadcast RDD must have same number of partitions as child RDD. Previously we serialize all batches in one partition into a ChunkedByteBuffer
, so broadcasted.value.length
is the number of partitions. Now it is changed to serialize one batch in one ChunkedByteBuffer
, so we need to use the correct number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update. Child RDD partition number may also not be same as the zipping side. We need to get the number of partition of zipping side when triggering this execution method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is described in #243.
3010119
to
25eec59
Compare
def serializeBatches(batches: Iterator[ColumnarBatch]): Iterator[(Long, ChunkedByteBuffer)] = { | ||
batches.map { batch => | ||
val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider | ||
|
||
val codec = CompressionCodec.createCodec(SparkEnv.get.conf) | ||
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to move serializeBatches
into spark
package because ChunkedByteBufferOutputStream
is a spark private class. I cannot move serializeBatches
to spark
module because it uses arrow packages (we shade arrow in common
module).
Merged. Thanks. |
These changes to testing were included in apache/datafusion-comet#213
* feat: Remove COMET_EXEC_BROADCAST_ENABLED * Fix * Fix * Update plan stability * Fix * Remove unused import and class * Fix * Remove unused imports * Fix * Fix scala style * fix * Fix * Update diff
Which issue does this PR close?
Closes #212.
Closes #241.
Closes #243.
Rationale for this change
What changes are included in this PR?
How are these changes tested?