Skip to content

Commit

Permalink
[GLUTEN-4903][CELEBORN] Support multiple versions of Celeborn (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk authored Mar 15, 2024
1 parent 0436cb8 commit 62746c4
Show file tree
Hide file tree
Showing 15 changed files with 511 additions and 76 deletions.
22 changes: 19 additions & 3 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,27 @@ jobs:
--local --preset=velox --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=40g -s=10.0 --threads=32 --iterations=1'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn 0.4.0
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \
'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.0-incubating/apache-celeborn-0.3.0-incubating-bin.tgz && \
tar xzf apache-celeborn-0.3.0-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.3.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \
'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.4.0-incubating/apache-celeborn-0.4.0-incubating-bin.tgz && \
tar xzf apache-celeborn-0.4.0-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.4.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \
mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
echo -e "CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g" > ./conf/celeborn-env.sh && \
echo -e "celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \
&& bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \
cd /opt/gluten/tools/gluten-it && mvn clean install -Pspark-3.2,rss,celeborn-0.4 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-celeborn --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-celeborn --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 && \
bash /opt/celeborn/sbin/stop-worker.sh \
&& bash /opt/celeborn/sbin/stop-master.sh && rm -rf /opt/celeborn'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn 0.3.2
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \
'wget https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.2-incubating/apache-celeborn-0.3.2-incubating-bin.tgz && \
tar xzf apache-celeborn-0.3.2-incubating-bin.tgz -C /opt/ && mv /opt/apache-celeborn-0.3.2-incubating-bin /opt/celeborn && cd /opt/celeborn && \
mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
echo -e "CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g" > ./conf/celeborn-env.sh && \
echo -e "celeborn.worker.commitFiles.threads 128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \
Expand Down
1 change: 1 addition & 0 deletions docs/get-started/ClickHouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ spark.dynamicAllocation.enabled false
```

#### Celeborn Columnar Shuffle Support
Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`.
The native Celeborn support can be enabled by the following configuration
```
spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager
Expand Down
4 changes: 3 additions & 1 deletion docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ Currently there are several ways to asscess S3 in Spark. Please refer [Velox S3]

## Celeborn support

Gluten with velox backend supports [Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle service. Below introduction is used to enable this feature
Gluten with velox backend supports [Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle service. Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`.

Below introduction is used to enable this feature

First refer to this URL(https://github.com/apache/incubator-celeborn) to setup a celeborn cluster.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import java.util
import java.util.Locale

class CHCelebornHashBasedColumnarShuffleWriter[K, V](
shuffleId: Int,
handle: CelebornShuffleHandle[K, V, V],
context: TaskContext,
celebornConf: CelebornConf,
client: ShuffleClient,
writeMetrics: ShuffleWriteMetricsReporter)
extends CelebornHashBasedColumnarShuffleWriter[K, V](
shuffleId: Int,
handle,
context,
celebornConf,
Expand Down Expand Up @@ -90,7 +92,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter);
val spilled = jniWrapper.evict(nativeShuffleWriter)
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ class CHCelebornHashBasedColumnarShuffleWriterFactory extends CelebornShuffleWri
override def backendName(): String = CHBackend.BACKEND_NAME

override def createShuffleWriterInstance[K, V](
shuffleId: Int,
handle: CelebornShuffleHandle[K, V, V],
context: TaskContext,
celebornConf: CelebornConf,
client: ShuffleClient,
writeMetrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
new CHCelebornHashBasedColumnarShuffleWriter[K, V](
shuffleId,
handle,
context,
celebornConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.spark.*;
import org.apache.spark.shuffle.*;
Expand All @@ -33,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -89,6 +89,12 @@ public class CelebornShuffleManager implements ShuffleManager {
ConcurrentHashMap.newKeySet();
private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;

// for Celeborn 0.4.0
private final Object shuffleIdTracker;

// for Celeborn 0.4.0
private boolean throwsFetchFailure;

public CelebornShuffleManager(SparkConf conf) {
if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) {
logger.warn(
Expand All @@ -99,6 +105,11 @@ public CelebornShuffleManager(SparkConf conf) {
this.conf = conf;
this.celebornConf = SparkUtils.fromSparkConf(conf);
this.fallbackPolicyRunner = new CelebornShuffleFallbackPolicyRunner(celebornConf);

this.shuffleIdTracker =
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);

this.throwsFetchFailure = CelebornUtils.getThrowsFetchFailure(celebornConf);
}

private boolean isDriver() {
Expand Down Expand Up @@ -129,58 +140,6 @@ private SparkShuffleManager vanillaCelebornShuffleManager() {
return _vanillaCelebornShuffleManager;
}

private ShuffleClient getShuffleClient(
String appUniqueId,
String lifecycleManagerHost,
Integer lifecycleManagerPort,
CelebornConf conf,
UserIdentifier userIdentifier,
Boolean isDriver) {
try {
try {
Method method =
// for Celeborn 0.3.1 and above, see CELEBORN-804
ShuffleClient.class.getDeclaredMethod(
"get",
String.class,
String.class,
int.class,
CelebornConf.class,
UserIdentifier.class);
return (ShuffleClient)
method.invoke(
null,
appUniqueId,
lifecycleManagerHost,
lifecycleManagerPort,
conf,
userIdentifier);
} catch (NoSuchMethodException noMethod) {
Method method =
// for Celeborn 0.3.0, see CELEBORN-798
ShuffleClient.class.getDeclaredMethod(
"get",
String.class,
String.class,
int.class,
CelebornConf.class,
UserIdentifier.class,
boolean.class);
return (ShuffleClient)
method.invoke(
null,
appUniqueId,
lifecycleManagerHost,
lifecycleManagerPort,
conf,
userIdentifier,
isDriver);
}
} catch (ReflectiveOperationException rethrow) {
throw new RuntimeException(rethrow);
}
}

private void initializeLifecycleManager() {
// Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we
// need to ensure that LifecycleManager will only be created once. Parallelism needs to be
Expand All @@ -190,27 +149,33 @@ private void initializeLifecycleManager() {
synchronized (this) {
if (lifecycleManager == null) {
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);

// for Celeborn 0.4.0
CelebornUtils.registerShuffleTrackerCallback(throwsFetchFailure, lifecycleManager);

shuffleClient =
getShuffleClient(
CelebornUtils.getShuffleClient(
appUniqueId,
lifecycleManager.getHost(),
lifecycleManager.getPort(),
celebornConf,
lifecycleManager.getUserIdentifier(),
Boolean.TRUE);
Boolean.TRUE,
null);
}
}
}
}

private <K, V, C> ShuffleHandle registerCelebornShuffleHandle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
return new CelebornShuffleHandle<>(
return CelebornUtils.getCelebornShuffleHandle(
appUniqueId,
lifecycleManager.getHost(),
lifecycleManager.getPort(),
lifecycleManager.getUserIdentifier(),
shuffleId,
throwsFetchFailure,
dependency.rdd().getNumPartitions(),
dependency);
}
Expand All @@ -220,6 +185,10 @@ public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();

// for Celeborn 0.4.0
CelebornUtils.registerAppShuffleDeterminate(lifecycleManager, shuffleId, dependency);

// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
Expand Down Expand Up @@ -248,13 +217,8 @@ public boolean unregisterShuffle(int shuffleId) {
return false;
}
}
if (appUniqueId == null) {
return true;
}
if (shuffleClient == null) {
return false;
}
return shuffleClient.unregisterShuffle(shuffleId, isDriver());
return CelebornUtils.unregisterShuffle(
lifecycleManager, shuffleClient, shuffleIdTracker, shuffleId, appUniqueId, isDriver());
}

@Override
Expand Down Expand Up @@ -284,24 +248,57 @@ public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) {
try {
if (handle instanceof CelebornShuffleHandle) {
byte[] extension;
try {
Field field = CelebornShuffleHandle.class.getDeclaredField("extension");
field.setAccessible(true);
extension = (byte[]) field.get(handle);

} catch (NoSuchFieldException e) {
extension = null;
}
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, V, V> h = ((CelebornShuffleHandle<K, V, V>) handle);
ShuffleClient client =
getShuffleClient(
CelebornUtils.getShuffleClient(
h.appUniqueId(),
h.lifecycleManagerHost(),
h.lifecycleManagerPort(),
celebornConf,
h.userIdentifier(),
false);
false,
extension);

int shuffleId;

// for Celeborn 0.4.0
try {
Method celebornShuffleIdMethod =
SparkUtils.class.getMethod(
"celebornShuffleId",
ShuffleClient.class,
CelebornShuffleHandle.class,
TaskContext.class,
boolean.class);
shuffleId = (int) celebornShuffleIdMethod.invoke(null, shuffleClient, h, context, true);

Method trackMethod =
CelebornUtils.getClassOrDefault(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME)
.getMethod("track", int.class, int.class);
trackMethod.invoke(shuffleIdTracker, h.shuffleId(), shuffleId);

} catch (NoSuchMethodException e) {
shuffleId = h.dependency().shuffleId();
}

if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
throw new UnsupportedOperationException(
"Unrecognized shuffle write mode!" + celebornConf.shuffleWriterMode());
}
if (h.dependency() instanceof ColumnarShuffleDependency) {
// columnar-based shuffle
return writerFactory.createShuffleWriterInstance(
h, context, celebornConf, client, metrics);
shuffleId, h, context, celebornConf, client, metrics);
} else {
// row-based shuffle
return vanillaCelebornShuffleManager().getWriter(handle, mapId, context, metrics);
Expand All @@ -327,15 +324,16 @@ public <K, C> ShuffleReader<K, C> getReader(
if (handle instanceof CelebornShuffleHandle) {
@SuppressWarnings("unchecked")
CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) handle;
return new CelebornShuffleReader<>(
return CelebornUtils.getCelebornShuffleReader(
h,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
celebornConf,
metrics);
metrics,
shuffleIdTracker);
}
return columnarShuffleManager()
.getReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface CelebornShuffleWriterFactory {
String backendName();

<K, V> ShuffleWriter<K, V> createShuffleWriterInstance(
int shuffleId,
CelebornShuffleHandle<K, V, V> handle,
TaskContext context,
CelebornConf celebornConf,
Expand Down
Loading

0 comments on commit 62746c4

Please sign in to comment.