-
Notifications
You must be signed in to change notification settings - Fork 450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Row based sort shuffle implementation #6475
Changes from all commits
920fd08
d922a36
97cd9b7
b351561
016a6fb
4705462
cdae405
83b6123
b869910
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -349,24 +349,30 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { | |
} | ||
|
||
def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = { | ||
if (GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle) { | ||
VeloxAppendBatchesExec(plan, GlutenConfig.getConf.veloxMinBatchSizeForShuffle) | ||
} else { | ||
plan | ||
plan match { | ||
case shuffle: ColumnarShuffleExchangeExec | ||
if !shuffle.useSortBasedShuffle && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this due to sort will do the coalesce behavior? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No matter coalesced or not, sort will convert all inputs into rows, so it won't benefit from coalesce. The test result shows the coalesce op only brings extra overhead. |
||
GlutenConfig.getConf.veloxCoalesceBatchesBeforeShuffle => | ||
val appendBatches = | ||
VeloxAppendBatchesExec(shuffle.child, GlutenConfig.getConf.veloxMinBatchSizeForShuffle) | ||
shuffle.withNewChildren(Seq(appendBatches)) | ||
case _ => plan | ||
} | ||
} | ||
|
||
val child = shuffle.child | ||
|
||
shuffle.outputPartitioning match { | ||
val newShuffle = shuffle.outputPartitioning match { | ||
case HashPartitioning(exprs, _) => | ||
val hashExpr = new Murmur3Hash(exprs) | ||
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output | ||
val projectTransformer = ProjectExecTransformer(projectList, child) | ||
val validationResult = projectTransformer.doValidate() | ||
if (validationResult.ok()) { | ||
val newChild = maybeAddAppendBatchesExec(projectTransformer) | ||
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1)) | ||
ColumnarShuffleExchangeExec( | ||
shuffle, | ||
projectTransformer, | ||
projectTransformer.output.drop(1)) | ||
} else { | ||
FallbackTags.add(shuffle, validationResult) | ||
shuffle.withNewChildren(child :: Nil) | ||
|
@@ -382,8 +388,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { | |
// null type since the value always be null. | ||
val columnsForHash = child.output.filterNot(_.dataType == NullType) | ||
if (columnsForHash.isEmpty) { | ||
val newChild = maybeAddAppendBatchesExec(child) | ||
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) | ||
ColumnarShuffleExchangeExec(shuffle, child, child.output) | ||
} else { | ||
val hashExpr = new Murmur3Hash(columnsForHash) | ||
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ child.output | ||
|
@@ -404,18 +409,20 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { | |
ProjectExecTransformer(projectList.drop(1), sortByHashCode) | ||
val validationResult = dropSortColumnTransformer.doValidate() | ||
if (validationResult.ok()) { | ||
val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer) | ||
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) | ||
ColumnarShuffleExchangeExec( | ||
shuffle, | ||
dropSortColumnTransformer, | ||
dropSortColumnTransformer.output) | ||
} else { | ||
FallbackTags.add(shuffle, validationResult) | ||
shuffle.withNewChildren(child :: Nil) | ||
} | ||
} | ||
} | ||
case _ => | ||
val newChild = maybeAddAppendBatchesExec(child) | ||
ColumnarShuffleExchangeExec(shuffle, newChild, null) | ||
ColumnarShuffleExchangeExec(shuffle, child, null) | ||
} | ||
maybeAddAppendBatchesExec(newShuffle) | ||
} | ||
|
||
/** Generate ShuffledHashJoinExecTransformer. */ | ||
|
@@ -572,11 +579,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { | |
val numOutputRows = metrics("numOutputRows") | ||
val deserializeTime = metrics("deserializeTime") | ||
val readBatchNumRows = metrics("avgReadBatchNumRows") | ||
val decompressTime: Option[SQLMetric] = if (!isSort) { | ||
Some(metrics("decompressTime")) | ||
} else { | ||
None | ||
} | ||
val decompressTime = metrics("decompressTime") | ||
if (GlutenConfig.getConf.isUseCelebornShuffleManager) { | ||
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer") | ||
val constructor = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ namespace gluten { | |
|
||
class ColumnarBatchSerializer { | ||
public: | ||
ColumnarBatchSerializer(arrow::MemoryPool* arrowPool, struct ArrowSchema* cSchema) : arrowPool_(arrowPool) {} | ||
ColumnarBatchSerializer(arrow::MemoryPool* arrowPool) : arrowPool_(arrowPool) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good cleanup |
||
|
||
virtual ~ColumnarBatchSerializer() = default; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,23 +37,13 @@ int32_t computePid(const int32_t* pidArr, int64_t i, int32_t numPartitions) { | |
return pid; | ||
} | ||
|
||
arrow::Status gluten::HashPartitioner::compute( | ||
const int32_t* pidArr, | ||
const int64_t numRows, | ||
std::vector<uint32_t>& row2partition, | ||
std::vector<uint32_t>& partition2RowCount) { | ||
arrow::Status | ||
gluten::HashPartitioner::compute(const int32_t* pidArr, const int64_t numRows, std::vector<uint32_t>& row2partition) { | ||
row2partition.resize(numRows); | ||
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems this fill operation is all removed, will this impact the performance? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is moved into Hash shuffle writer https://github.com/apache/incubator-gluten/pull/6475/files#diff-0ea9c330c6637a7c2814fd738102f2928bca7b74a99c9f15163a6dce3f3c1e5fR278, because the |
||
|
||
for (auto i = 0; i < numRows; ++i) { | ||
auto pid = computePid(pidArr, i, numPartitions_); | ||
row2partition[i] = pid; | ||
} | ||
|
||
for (auto& pid : row2partition) { | ||
partition2RowCount[pid]++; | ||
} | ||
|
||
return arrow::Status::OK(); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is newly added for sort based shuffle, it's not available in hash based shuffle, however this may bring some trouble for analysis script
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked with the analysis script. Except for the
get_shuffle_stat
, other analysis (sar, emon, time breakdown, app comparison) goes well.Perhaps we can introduce a new metrics
shuffle write wall time
to calculate the real time for both hash and sort shuffle, and modify the analysis script to use this one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address in follow-up PR.