-
Notifications
You must be signed in to change notification settings - Fork 447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Provide options to combine small batches before sending to shuffle #6009
Conversation
3be5ae8
to
b6354c5
Compare
@marin-ma There might be some batch-wise overhead around shuffle split processing. We may want to figure it out later to avoid doing such batch coalesce operations that introduce extra copies. |
626e837
to
9e26c72
Compare
Run Gluten Clickhouse CI |
#5951 (comment) |
Do you have any thoughts on moving to this approach since ec3e92e has been merged? I don't have strong preference except that we need a configuration and metrics for batch-appending. I suggest merging this to make things configurable and displayable at first, then if we want to continue on #5951 's approach, you can open another PR to bring the code back and re-use the conf code added in this patch. And remove calls to |
It's included, namely |
/Benchmark Velox TPCDS |
std::unique_ptr<gluten::JniColumnarBatchIterator> gluten::makeJniColumnarBatchIterator( | ||
JNIEnv* env, | ||
jobject jColumnarBatchItr, | ||
gluten::Runtime* runtime, | ||
std::shared_ptr<ArrowWriter> writer) { | ||
return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr, runtime, writer); | ||
} | ||
|
||
gluten::JniColumnarBatchIterator::JniColumnarBatchIterator( | ||
JNIEnv* env, | ||
jobject jColumnarBatchItr, | ||
gluten::Runtime* runtime, | ||
std::shared_ptr<ArrowWriter> writer) | ||
: runtime_(runtime), writer_(writer) { | ||
// IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD | ||
if (env->GetJavaVM(&vm_) != JNI_OK) { | ||
std::string errorMessage = "Unable to get JavaVM instance"; | ||
throw gluten::GlutenException(errorMessage); | ||
} | ||
serializedColumnarBatchIteratorClass_ = | ||
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;"); | ||
serializedColumnarBatchIteratorHasNext_ = | ||
getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, "hasNext", "()Z"); | ||
serializedColumnarBatchIteratorNext_ = getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, "next", "()J"); | ||
jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr); | ||
} | ||
|
||
gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() { | ||
JNIEnv* env; | ||
attachCurrentThreadAsDaemonOrThrow(vm_, &env); | ||
env->DeleteGlobalRef(jColumnarBatchItr_); | ||
env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_); | ||
vm_->DetachCurrentThread(); | ||
} | ||
|
||
std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next() { | ||
JNIEnv* env; | ||
attachCurrentThreadAsDaemonOrThrow(vm_, &env); | ||
if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { | ||
checkException(env); | ||
return nullptr; // stream ended | ||
} | ||
|
||
checkException(env); | ||
jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext_); | ||
checkException(env); | ||
auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle); | ||
if (writer_ != nullptr) { | ||
// save snapshot of the batch to file | ||
std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema(); | ||
std::shared_ptr<ArrowArray> array = batch->exportArrowArray(); | ||
auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get())); | ||
GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get()))); | ||
GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb)); | ||
} | ||
return batch; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code movement
class JniColumnarBatchIterator : public ColumnarBatchIterator { | ||
public: | ||
explicit JniColumnarBatchIterator( | ||
JNIEnv* env, | ||
jobject jColumnarBatchItr, | ||
Runtime* runtime, | ||
std::shared_ptr<ArrowWriter> writer); | ||
|
||
// singleton | ||
JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete; | ||
JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete; | ||
JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = delete; | ||
JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete; | ||
|
||
virtual ~JniColumnarBatchIterator(); | ||
|
||
std::shared_ptr<ColumnarBatch> next() override; | ||
|
||
private: | ||
JavaVM* vm_; | ||
jobject jColumnarBatchItr_; | ||
Runtime* runtime_; | ||
std::shared_ptr<ArrowWriter> writer_; | ||
|
||
jclass serializedColumnarBatchIteratorClass_; | ||
jmethodID serializedColumnarBatchIteratorHasNext_; | ||
jmethodID serializedColumnarBatchIteratorNext_; | ||
}; | ||
|
||
std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator( | ||
JNIEnv* env, | ||
jobject jColumnarBatchItr, | ||
Runtime* runtime, | ||
std::shared_ptr<ArrowWriter> writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code movement
case p if TransformHints.isNotTransformable(p) => | ||
p | ||
case s: ShuffleExchangeExec | ||
if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && | ||
BackendsApiManager.getSettings.supportColumnarShuffleExec() => | ||
logDebug(s"Columnar Processing for ${s.getClass} is currently supported.") | ||
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(s) | ||
case b: BroadcastExchangeExec => | ||
val child = b.child | ||
logDebug(s"Columnar Processing for ${b.getClass} is currently supported.") | ||
ColumnarBroadcastExchangeExec(b.mode, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code simplification
@@ -101,7 +101,7 @@ trait SparkPlanExecApi { | |||
aggregateExpressions: Seq[AggregateExpression], | |||
aggregateAttributes: Seq[Attribute]): HashAggregateExecPullOutBaseHelper | |||
|
|||
def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec, newChild: SparkPlan): SparkPlan | |||
def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API simplification
I think the design is same as facebookincubator/velox#7801 and would be great to implement in Gluten side. my suggestion is we could implement |
I think filter and HashJoin(with filter) may also need this as well. Thanks. |
yeah, above velox issue has list out all operators that may need apply this optimization, we could provide a config for these operators, which value is a list of operator name string. |
Just comment this: |
@zhztheplayer Let's remove the option and set it as default behavior as long as it can benefit in all cases. |
It's because the initialization of current split function. Currently we use 3 loops (per column, per reducer, per row) to do the split, if the column data is cached then the solution is the best way to scale to reducer numbers. However to achieve this, we need much initialization work to create several vectors. If the input batch is small, we will suffer from the initialization overhead. Even bigger than the copy to bigger batches. Another issue is if the data size is too large and exceeds the cache size, then performance will be very poor. |
It's planned but I'll use another PR to do that. Setting it true by default may fail some plan checks in UTs. |
Why UT fails? we should fix that. |
The PR adds a new operator |
Let's update the UT code then. |
The tricky part is that I don't see we always follow either Maybe we can change the option name to |
One misunderstanding is that "we should avoid memcpy as much as possible", but in fact Gluten isn't memory throughput bound yet. The sequential data read and write is a cheaper operation if the block size isn't too small (like several Bytes but not predictable, then the overhead is branch misprediction) or too large (like GB level). |
In long term Velox should limit the batch size to maxBatchSize we configured, so as in Gluten. For operators like Combine, we may limit the batch size to 2 x maxBatchSize so we needn't to cache the second batch. |
We may need to propose this Exec to Velox |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
@XinShuoWang I'll merge this then do some follow-ups. Let me know if any comments, thanks. Also thank you guys for reviewing. :) |
The PR disabled the feature by default. Let's merge this in advance and benchmark the subsequent PRs. |
It's observed that Velox hash-based shuffle is slowed down by small input batches.
The patch:
spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle
(Default: false) Set to true to combine small batches with minimal batch size determined by
spark.gluten.sql.columnar.maxBatchSize
. (Note the misnaming ofmaxBatchSize
in Gluten, it might tend to beminBatchSize
or justbatchSize
)spark.gluten.sql.columnar.backend.velox.minBatchSizeForShuffle
(Optional) Set to override the minimal batch size used by
coalesceBatchesBeforeShuffle
.Comparisons
(spark.gluten.sql.columnar.backend.velox.coalesceBatchesBeforeShuffle=false/true)
Q31 total time, before and after (SF1000 partitioned table, scan partitions 112, shuffle partitions 112):
Closer look at exchange, before and after: