Skip to content

Commit

Permalink
[CELEBORN] Add compression for row-based shuffle (#6380)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk authored Jul 11, 2024
1 parent 8383729 commit 62036bf
Showing 1 changed file with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class CelebornShuffleManager implements ShuffleManager {
private static final String LOCAL_SHUFFLE_READER_KEY =
"spark.sql.adaptive.localShuffleReader.enabled";

private static final String CELEBORN_COMPRESSION_CODEC_KEY =
CelebornConf.SHUFFLE_COMPRESSION_CODEC().key();

private static final String SPARK_CELEBORN_COMPRESSION_CODEC_KEY =
"spark." + CELEBORN_COMPRESSION_CODEC_KEY;

private static final CelebornShuffleWriterFactory writerFactory;

static {
Expand All @@ -78,6 +84,8 @@ public class CelebornShuffleManager implements ShuffleManager {

private final SparkConf conf;
private final CelebornConf celebornConf;
private final SparkConf rowBasedConf;
private final CelebornConf rowBasedCelebornConf;
// either be "{appId}_{appAttemptId}" or "{appId}"
private String appUniqueId;

Expand All @@ -89,6 +97,8 @@ public class CelebornShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;

private final String celebornDefaultCodec;

// for Celeborn 0.4.0
private final Object shuffleIdTracker;

Expand All @@ -110,6 +120,16 @@ public CelebornShuffleManager(SparkConf conf) {
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);

this.throwsFetchFailure = CelebornUtils.getThrowsFetchFailure(celebornConf);

this.celebornDefaultCodec = CelebornConf.SHUFFLE_COMPRESSION_CODEC().defaultValueString();

this.rowBasedConf = conf.clone();
this.rowBasedCelebornConf = celebornConf.clone();
if ("none"
.equalsIgnoreCase(conf.get(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, celebornDefaultCodec))) {
rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, celebornDefaultCodec);
rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, celebornDefaultCodec);
}
}

private boolean isDriver() {
Expand All @@ -133,7 +153,8 @@ private SparkShuffleManager vanillaCelebornShuffleManager() {
synchronized (this) {
if (_vanillaCelebornShuffleManager == null) {
_vanillaCelebornShuffleManager =
SparkUtils.instantiateClass(VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, conf, isDriver());
SparkUtils.instantiateClass(
VANILLA_CELEBORN_SHUFFLE_MANAGER_NAME, rowBasedConf, isDriver());
}
}
}
Expand Down Expand Up @@ -330,14 +351,18 @@ public <K, C> ShuffleReader<K, C> getReader(
if (handle instanceof CelebornShuffleHandle) {
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) handle;
CelebornConf readerConf = celebornConf;
if (!(h.dependency() instanceof ColumnarShuffleDependency)) {
readerConf = rowBasedCelebornConf;
}
return CelebornUtils.getCelebornShuffleReader(
h,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
celebornConf,
readerConf,
metrics,
shuffleIdTracker);
}
Expand Down

0 comments on commit 62036bf

Please sign in to comment.